Skip to content

Commit

Permalink
chore(data-warehouse): Refactor temporal env var (#18824)
Browse files Browse the repository at this point in the history
* update env var, update folders so there are common and specific workflow modules

* update workflow

* update imports

* reformat

* already sync

* revert task queue change

* update github action and env vars

* remap

* move
  • Loading branch information
EDsCODE authored Nov 29, 2023
1 parent f2a4b17 commit daac935
Show file tree
Hide file tree
Showing 54 changed files with 213 additions and 170 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/container-images-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ jobs:
"image_tag": "${{ steps.build.outputs.digest }}"
}
- name: Check for changes that affect temporal worker
id: check_changes_temporal_worker
- name: Check for changes that affect batch exports temporal worker
id: check_changes_batch_exports_temporal_worker
run: |
echo "::set-output name=changed::$(git diff --name-only HEAD^ HEAD | grep -E '^posthog/temporal/|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' || true)"
echo "::set-output name=changed::$(git diff --name-only HEAD^ HEAD | grep -E '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' || true)"
- name: Trigger Temporal Worker Cloud deployment
if: steps.check_changes_temporal_worker.outputs.changed != ''
- name: Trigger Batch Exports Temporal Worker Cloud deployment
if: steps.check_changes_batch_exports_temporal_worker.outputs.changed != ''
uses: mvasigh/dispatch-action@main
with:
token: ${{ steps.deployer.outputs.token }}
Expand All @@ -139,5 +139,6 @@ jobs:
event_type: temporal_worker_deploy
message: |
{
"image_tag": "${{ steps.build.outputs.digest }}"
"image_tag": "${{ steps.build.outputs.digest }}",
"worker_name": "temporal-worker"
}
4 changes: 2 additions & 2 deletions posthog/api/test/batch_exports/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.batch_exports.models import BatchExport
from posthog.temporal.client import sync_connect
from posthog.temporal.workflows import ACTIVITIES, WORKFLOWS
from posthog.temporal.common.client import sync_connect
from posthog.temporal.batch_exports import ACTIVITIES, WORKFLOWS


class ThreadedWorker(Worker):
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect

pytestmark = [
pytest.mark.django_db,
Expand Down
4 changes: 2 additions & 2 deletions posthog/api/test/batch_exports/test_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.models import BatchExport
from posthog.temporal.client import sync_connect
from posthog.temporal.codec import EncryptionCodec
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.codec import EncryptionCodec

pytestmark = [
pytest.mark.django_db,
Expand Down
4 changes: 2 additions & 2 deletions posthog/api/test/batch_exports/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.service import describe_schedule
from posthog.temporal.client import sync_connect
from posthog.temporal.common.schedule import describe_schedule
from posthog.temporal.common.client import sync_connect

pytestmark = [
pytest.mark.django_db,
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect

pytestmark = [
pytest.mark.django_db,
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_log_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
fetch_batch_export_log_entries,
)
from posthog.client import sync_execute
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect


def create_batch_export_log_entry(
Expand Down
7 changes: 4 additions & 3 deletions posthog/api/test/batch_exports/test_pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.service import delete_schedule, describe_schedule
from posthog.temporal.client import sync_connect
from posthog.batch_exports.service import batch_export_delete_schedule
from posthog.temporal.common.schedule import describe_schedule
from posthog.temporal.common.client import sync_connect

pytestmark = [
pytest.mark.django_db,
Expand Down Expand Up @@ -344,7 +345,7 @@ def test_pause_non_existent_batch_export(client: HttpClient):
schedule_desc = describe_schedule(temporal, batch_export["id"])
assert schedule_desc.schedule.state.paused is False

resp = delete_schedule(temporal, batch_export["id"])
resp = batch_export_delete_schedule(temporal, batch_export["id"])

batch_export_id = batch_export["id"]
resp = pause_batch_export(client, team.pk, batch_export_id)
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/batch_exports/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect

pytestmark = [
pytest.mark.django_db,
Expand Down
4 changes: 2 additions & 2 deletions posthog/api/test/batch_exports/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from posthog.api.test.test_user import create_user
from posthog.batch_exports.service import sync_batch_export
from posthog.models import BatchExport, BatchExportDestination
from posthog.temporal.client import sync_connect
from posthog.temporal.codec import EncryptionCodec
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.codec import EncryptionCodec

pytestmark = [
pytest.mark.django_db,
Expand Down
4 changes: 2 additions & 2 deletions posthog/api/test/test_team.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from temporalio.service import RPCError

from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.batch_exports.service import describe_schedule
from posthog.temporal.common.schedule import describe_schedule
from posthog.constants import AvailableFeature
from posthog.models import EarlyAccessFeature
from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType
Expand All @@ -19,7 +19,7 @@
from posthog.models.organization import Organization, OrganizationMembership
from posthog.models.team import Team
from posthog.models.team.team import get_team_in_cache
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect
from posthog.test.base import APIBaseTest


Expand Down
6 changes: 3 additions & 3 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
BatchExportServiceScheduleNotFound,
backfill_export,
cancel_running_batch_export_backfill,
delete_schedule,
batch_export_delete_schedule,
pause_batch_export,
sync_batch_export,
unpause_batch_export,
Expand All @@ -48,7 +48,7 @@
ProjectMembershipNecessaryPermissions,
TeamMemberAccessPermission,
)
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect
from posthog.utils import relative_date_parse

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -336,7 +336,7 @@ def perform_destroy(self, instance: BatchExport):
instance.deleted = True

try:
delete_schedule(temporal, str(instance.pk))
batch_export_delete_schedule(temporal, str(instance.pk))
except BatchExportServiceScheduleNotFound as e:
logger.warning("The Schedule %s could not be deleted as it was not found", e.schedule_id)

Expand Down
62 changes: 10 additions & 52 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
SchedulePolicy,
ScheduleSpec,
ScheduleState,
ScheduleUpdate,
ScheduleUpdateInput,
)

from posthog import settings
Expand All @@ -24,7 +22,14 @@
BatchExportBackfill,
BatchExportRun,
)
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.schedule import (
create_schedule,
update_schedule,
unpause_schedule,
pause_schedule,
delete_schedule,
)


class BatchExportsInputsProtocol(typing.Protocol):
Expand Down Expand Up @@ -195,13 +200,6 @@ def pause_batch_export(temporal: Client, batch_export_id: str, note: str | None
batch_export.save()


@async_to_sync
async def pause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Pause a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.pause(note=note)


def unpause_batch_export(
temporal: Client,
batch_export_id: str,
Expand Down Expand Up @@ -248,34 +246,17 @@ def unpause_batch_export(
backfill_export(temporal, batch_export_id, start_at, end_at)


@async_to_sync
async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None:
"""Unpause a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.unpause(note=note)


@async_to_sync
async def delete_schedule(temporal: Client, schedule_id: str) -> None:
def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Delete a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)

try:
await handle.delete()
delete_schedule(temporal, schedule_id)
except temporalio.service.RPCError as e:
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:
raise BatchExportServiceScheduleNotFound(schedule_id)
else:
raise BatchExportServiceRPCError() from e


@async_to_sync
async def describe_schedule(temporal: Client, schedule_id: str):
"""Describe a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
return await handle.describe()


@async_to_sync
async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None:
"""Delete a running BatchExportBackfill.
Expand Down Expand Up @@ -438,29 +419,6 @@ def sync_batch_export(batch_export: BatchExport, created: bool):
return batch_export


@async_to_sync
async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False):
"""Create a Temporal Schedule."""
return await temporal.create_schedule(
id=id,
schedule=schedule,
trigger_immediately=trigger_immediately,
)


@async_to_sync
async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None:
"""Update a Temporal Schedule."""
handle = temporal.get_schedule_handle(id)

async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
return ScheduleUpdate(schedule=schedule)

return await handle.update(
updater=updater,
)


def create_batch_export_backfill(
batch_export_id: UUID,
team_id: int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from posthog.batch_exports.models import BatchExport, BatchExportDestination
from posthog.batch_exports.service import backfill_export, sync_batch_export
from posthog.models.plugin import PluginAttachment, PluginConfig
from posthog.temporal.client import sync_connect
from posthog.temporal.common.client import sync_connect


class Command(BaseCommand):
Expand Down
4 changes: 2 additions & 2 deletions posthog/management/commands/execute_temporal_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from django.core.management.base import BaseCommand
from temporalio.common import RetryPolicy, WorkflowIDReusePolicy

from posthog.temporal.client import connect
from posthog.temporal.workflows import WORKFLOWS
from posthog.temporal.common.client import connect
from posthog.temporal.batch_exports import WORKFLOWS


class Command(BaseCommand):
Expand Down
18 changes: 17 additions & 1 deletion posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from posthog.temporal.worker import start_worker
from posthog.temporal.common.worker import start_worker
from posthog.temporal.batch_exports import WORKFLOWS as BATCH_EXPORTS_WORKFLOWS, ACTIVITIES as BATCH_EXPORTS_ACTIVITIES

WORKFLOWS_DICT = {
"no-sandbox-python-django": BATCH_EXPORTS_WORKFLOWS,
}
ACTIVITIES_DICT = {
"no-sandbox-python-django": BATCH_EXPORTS_ACTIVITIES,
}


class Command(BaseCommand):
Expand Down Expand Up @@ -65,6 +73,12 @@ def handle(self, *args, **options):
client_cert = options.get("client_cert", None)
client_key = options.get("client_key", None)

try:
workflows = WORKFLOWS_DICT[task_queue]
activities = ACTIVITIES_DICT[task_queue]
except KeyError:
raise ValueError(f"Task queue {task_queue} not found in WORKFLOWS_DICT or ACTIVITIES_DICT")

if options["client_key"]:
options["client_key"] = "--SECRET--"
logging.info(f"Starting Temporal Worker with options: {options}")
Expand All @@ -83,5 +97,7 @@ def handle(self, *args, **options):
server_root_ca_cert=server_root_ca_cert,
client_cert=client_cert,
client_key=client_key,
workflows=workflows,
activities=activities,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
map_plugin_config_to_destination,
)
from posthog.models import Plugin, PluginAttachment, PluginConfig
from posthog.temporal.client import sync_connect
from posthog.temporal.codec import EncryptionCodec
from posthog.temporal.common.client import sync_connect
from posthog.temporal.common.codec import EncryptionCodec


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions posthog/models/team/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import timedelta
from typing import Any, List

from posthog.temporal.client import sync_connect
from posthog.batch_exports.service import delete_schedule
from posthog.temporal.common.client import sync_connect
from posthog.batch_exports.service import batch_export_delete_schedule
from posthog.cache_utils import cache_for
from posthog.models.async_migration import is_async_migration_complete

Expand Down Expand Up @@ -44,7 +44,7 @@ def delete_batch_exports(team_ids: List[int]):
batch_export.delete()
batch_export.destination.delete()

delete_schedule(temporal, str(schedule_id))
batch_export_delete_schedule(temporal, str(schedule_id))


can_enable_actor_on_events = False
Expand Down
1 change: 1 addition & 0 deletions posthog/temporal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading

0 comments on commit daac935

Please sign in to comment.