From daac935bf40b696edf6592d5bc5d1ff64e4b382f Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Wed, 29 Nov 2023 09:32:38 -0500 Subject: [PATCH] chore(data-warehouse): Refactor temporal env var (#18824) * update env var, update folders so there are common and specific workflow modules * update workflow * update imports * reformat * already sync * revert task queue change * update github action and env vars * remap * move --- .github/workflows/container-images-cd.yml | 13 ++-- posthog/api/test/batch_exports/conftest.py | 4 +- .../api/test/batch_exports/test_backfill.py | 2 +- posthog/api/test/batch_exports/test_create.py | 4 +- posthog/api/test/batch_exports/test_delete.py | 4 +- posthog/api/test/batch_exports/test_get.py | 2 +- .../api/test/batch_exports/test_log_entry.py | 2 +- posthog/api/test/batch_exports/test_pause.py | 7 +- posthog/api/test/batch_exports/test_runs.py | 2 +- posthog/api/test/batch_exports/test_update.py | 4 +- posthog/api/test/test_team.py | 4 +- posthog/batch_exports/http.py | 6 +- posthog/batch_exports/service.py | 62 +++--------------- .../commands/create_batch_export_from_app.py | 2 +- .../commands/execute_temporal_workflow.py | 4 +- .../commands/start_temporal_worker.py | 18 ++++- .../test/test_create_batch_export_from_app.py | 4 +- posthog/models/team/util.py | 6 +- posthog/temporal/__init__.py | 1 + .../{workflows => batch_exports}/__init__.py | 18 ++--- .../backfill_batch_export.py | 6 +- .../{workflows => batch_exports}/base.py | 0 .../batch_exports.py | 4 +- .../bigquery_batch_export.py | 12 ++-- .../clickhouse.py | 0 .../{workflows => batch_exports}/logger.py | 0 .../{workflows => batch_exports}/metrics.py | 0 .../{workflows => batch_exports}/noop.py | 2 +- .../postgres_batch_export.py | 10 +-- .../redshift_batch_export.py | 12 ++-- .../s3_batch_export.py | 10 +-- .../snowflake_batch_export.py | 12 ++-- .../squash_person_overrides.py | 2 +- posthog/temporal/common/__init__.py | 0 posthog/temporal/{ => common}/client.py | 2 +- posthog/temporal/{ => common}/codec.py | 0 posthog/temporal/common/schedule.py | 65 +++++++++++++++++++ posthog/temporal/{ => common}/sentry.py | 0 posthog/temporal/{ => common}/utils.py | 0 posthog/temporal/{ => common}/worker.py | 11 ++-- .../test_backfill_batch_export.py | 2 +- .../tests/batch_exports/test_batch_exports.py | 2 +- .../test_bigquery_batch_export_workflow.py | 4 +- .../tests/batch_exports/test_logger.py | 2 +- .../test_postgres_batch_export_workflow.py | 4 +- .../test_redshift_batch_export_workflow.py | 4 +- .../tests/batch_exports/test_run_updates.py | 2 +- .../test_s3_batch_export_workflow.py | 18 ++--- .../test_snowflake_batch_export_workflow.py | 10 +-- posthog/temporal/tests/conftest.py | 8 +-- posthog/temporal/tests/test_clickhouse.py | 2 +- .../temporal/tests/test_encryption_codec.py | 4 +- .../test_squash_person_overrides_workflow.py | 2 +- posthog/temporal/tests/utils/events.py | 2 +- 54 files changed, 213 insertions(+), 170 deletions(-) rename posthog/temporal/{workflows => batch_exports}/__init__.py (71%) rename posthog/temporal/{workflows => batch_exports}/backfill_batch_export.py (98%) rename posthog/temporal/{workflows => batch_exports}/base.py (100%) rename posthog/temporal/{workflows => batch_exports}/batch_exports.py (99%) rename posthog/temporal/{workflows => batch_exports}/bigquery_batch_export.py (96%) rename posthog/temporal/{workflows => batch_exports}/clickhouse.py (100%) rename posthog/temporal/{workflows => batch_exports}/logger.py (100%) rename posthog/temporal/{workflows => batch_exports}/metrics.py (100%) rename posthog/temporal/{workflows => batch_exports}/noop.py (95%) rename posthog/temporal/{workflows => batch_exports}/postgres_batch_export.py (97%) rename posthog/temporal/{workflows => batch_exports}/redshift_batch_export.py (97%) rename posthog/temporal/{workflows => batch_exports}/s3_batch_export.py (98%) rename posthog/temporal/{workflows => batch_exports}/snowflake_batch_export.py (97%) rename posthog/temporal/{workflows => batch_exports}/squash_person_overrides.py (99%) create mode 100644 posthog/temporal/common/__init__.py rename posthog/temporal/{ => common}/client.py (95%) rename posthog/temporal/{ => common}/codec.py (100%) create mode 100644 posthog/temporal/common/schedule.py rename posthog/temporal/{ => common}/sentry.py (100%) rename posthog/temporal/{ => common}/utils.py (100%) rename posthog/temporal/{ => common}/worker.py (84%) diff --git a/.github/workflows/container-images-cd.yml b/.github/workflows/container-images-cd.yml index c7e64245640fd..5ea3625b72837 100644 --- a/.github/workflows/container-images-cd.yml +++ b/.github/workflows/container-images-cd.yml @@ -124,13 +124,13 @@ jobs: "image_tag": "${{ steps.build.outputs.digest }}" } - - name: Check for changes that affect temporal worker - id: check_changes_temporal_worker + - name: Check for changes that affect batch exports temporal worker + id: check_changes_batch_exports_temporal_worker run: | - echo "::set-output name=changed::$(git diff --name-only HEAD^ HEAD | grep -E '^posthog/temporal/|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' || true)" + echo "::set-output name=changed::$(git diff --name-only HEAD^ HEAD | grep -E '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' || true)" - - name: Trigger Temporal Worker Cloud deployment - if: steps.check_changes_temporal_worker.outputs.changed != '' + - name: Trigger Batch Exports Temporal Worker Cloud deployment + if: steps.check_changes_batch_exports_temporal_worker.outputs.changed != '' uses: mvasigh/dispatch-action@main with: token: ${{ steps.deployer.outputs.token }} @@ -139,5 +139,6 @@ jobs: event_type: temporal_worker_deploy message: | { - "image_tag": "${{ steps.build.outputs.digest }}" + "image_tag": "${{ steps.build.outputs.digest }}", + "worker_name": "temporal-worker" } diff --git a/posthog/api/test/batch_exports/conftest.py b/posthog/api/test/batch_exports/conftest.py index 3910b0ff69de8..5303e6c36c454 100644 --- a/posthog/api/test/batch_exports/conftest.py +++ b/posthog/api/test/batch_exports/conftest.py @@ -13,8 +13,8 @@ from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.batch_exports.models import BatchExport -from posthog.temporal.client import sync_connect -from posthog.temporal.workflows import ACTIVITIES, WORKFLOWS +from posthog.temporal.common.client import sync_connect +from posthog.temporal.batch_exports import ACTIVITIES, WORKFLOWS class ThreadedWorker(Worker): diff --git a/posthog/api/test/batch_exports/test_backfill.py b/posthog/api/test/batch_exports/test_backfill.py index a0c874717fb46..987dfd87464d3 100644 --- a/posthog/api/test/batch_exports/test_backfill.py +++ b/posthog/api/test/batch_exports/test_backfill.py @@ -10,7 +10,7 @@ from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/batch_exports/test_create.py b/posthog/api/test/batch_exports/test_create.py index d2f0a13d72a0b..5d5d6896ec0bf 100644 --- a/posthog/api/test/batch_exports/test_create.py +++ b/posthog/api/test/batch_exports/test_create.py @@ -13,8 +13,8 @@ from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user from posthog.batch_exports.models import BatchExport -from posthog.temporal.client import sync_connect -from posthog.temporal.codec import EncryptionCodec +from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.codec import EncryptionCodec pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/batch_exports/test_delete.py b/posthog/api/test/batch_exports/test_delete.py index cc07ed4675151..574f700327bc3 100644 --- a/posthog/api/test/batch_exports/test_delete.py +++ b/posthog/api/test/batch_exports/test_delete.py @@ -18,8 +18,8 @@ from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user -from posthog.batch_exports.service import describe_schedule -from posthog.temporal.client import sync_connect +from posthog.temporal.common.schedule import describe_schedule +from posthog.temporal.common.client import sync_connect pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/batch_exports/test_get.py b/posthog/api/test/batch_exports/test_get.py index c7c168583fcd5..f5e0060bc67b5 100644 --- a/posthog/api/test/batch_exports/test_get.py +++ b/posthog/api/test/batch_exports/test_get.py @@ -10,7 +10,7 @@ from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/batch_exports/test_log_entry.py b/posthog/api/test/batch_exports/test_log_entry.py index 8012766464ffd..ad0ee797033b0 100644 --- a/posthog/api/test/batch_exports/test_log_entry.py +++ b/posthog/api/test/batch_exports/test_log_entry.py @@ -18,7 +18,7 @@ fetch_batch_export_log_entries, ) from posthog.client import sync_execute -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect def create_batch_export_log_entry( diff --git a/posthog/api/test/batch_exports/test_pause.py b/posthog/api/test/batch_exports/test_pause.py index 3eb752331b762..2c92a00af94ee 100644 --- a/posthog/api/test/batch_exports/test_pause.py +++ b/posthog/api/test/batch_exports/test_pause.py @@ -17,8 +17,9 @@ from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user -from posthog.batch_exports.service import delete_schedule, describe_schedule -from posthog.temporal.client import sync_connect +from posthog.batch_exports.service import batch_export_delete_schedule +from posthog.temporal.common.schedule import describe_schedule +from posthog.temporal.common.client import sync_connect pytestmark = [ pytest.mark.django_db, @@ -344,7 +345,7 @@ def test_pause_non_existent_batch_export(client: HttpClient): schedule_desc = describe_schedule(temporal, batch_export["id"]) assert schedule_desc.schedule.state.paused is False - resp = delete_schedule(temporal, batch_export["id"]) + resp = batch_export_delete_schedule(temporal, batch_export["id"]) batch_export_id = batch_export["id"] resp = pause_batch_export(client, team.pk, batch_export_id) diff --git a/posthog/api/test/batch_exports/test_runs.py b/posthog/api/test/batch_exports/test_runs.py index fa0ecae48f7a1..feac9b5c63e18 100644 --- a/posthog/api/test/batch_exports/test_runs.py +++ b/posthog/api/test/batch_exports/test_runs.py @@ -11,7 +11,7 @@ from posthog.api.test.test_organization import create_organization from posthog.api.test.test_team import create_team from posthog.api.test.test_user import create_user -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/batch_exports/test_update.py b/posthog/api/test/batch_exports/test_update.py index c186546856a5d..f91289025a76e 100644 --- a/posthog/api/test/batch_exports/test_update.py +++ b/posthog/api/test/batch_exports/test_update.py @@ -19,8 +19,8 @@ from posthog.api.test.test_user import create_user from posthog.batch_exports.service import sync_batch_export from posthog.models import BatchExport, BatchExportDestination -from posthog.temporal.client import sync_connect -from posthog.temporal.codec import EncryptionCodec +from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.codec import EncryptionCodec pytestmark = [ pytest.mark.django_db, diff --git a/posthog/api/test/test_team.py b/posthog/api/test/test_team.py index 608e1eea5dbaf..6945defe10b3b 100644 --- a/posthog/api/test/test_team.py +++ b/posthog/api/test/test_team.py @@ -10,7 +10,7 @@ from temporalio.service import RPCError from posthog.api.test.batch_exports.conftest import start_test_worker -from posthog.batch_exports.service import describe_schedule +from posthog.temporal.common.schedule import describe_schedule from posthog.constants import AvailableFeature from posthog.models import EarlyAccessFeature from posthog.models.async_deletion.async_deletion import AsyncDeletion, DeletionType @@ -19,7 +19,7 @@ from posthog.models.organization import Organization, OrganizationMembership from posthog.models.team import Team from posthog.models.team.team import get_team_in_cache -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect from posthog.test.base import APIBaseTest diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index cef17ab628f32..e896ac70e0be1 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -31,7 +31,7 @@ BatchExportServiceScheduleNotFound, backfill_export, cancel_running_batch_export_backfill, - delete_schedule, + batch_export_delete_schedule, pause_batch_export, sync_batch_export, unpause_batch_export, @@ -48,7 +48,7 @@ ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission, ) -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect from posthog.utils import relative_date_parse logger = structlog.get_logger(__name__) @@ -336,7 +336,7 @@ def perform_destroy(self, instance: BatchExport): instance.deleted = True try: - delete_schedule(temporal, str(instance.pk)) + batch_export_delete_schedule(temporal, str(instance.pk)) except BatchExportServiceScheduleNotFound as e: logger.warning("The Schedule %s could not be deleted as it was not found", e.schedule_id) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 38cecda263aaa..12169d295dd45 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -14,8 +14,6 @@ SchedulePolicy, ScheduleSpec, ScheduleState, - ScheduleUpdate, - ScheduleUpdateInput, ) from posthog import settings @@ -24,7 +22,14 @@ BatchExportBackfill, BatchExportRun, ) -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.schedule import ( + create_schedule, + update_schedule, + unpause_schedule, + pause_schedule, + delete_schedule, +) class BatchExportsInputsProtocol(typing.Protocol): @@ -195,13 +200,6 @@ def pause_batch_export(temporal: Client, batch_export_id: str, note: str | None batch_export.save() -@async_to_sync -async def pause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: - """Pause a Temporal Schedule.""" - handle = temporal.get_schedule_handle(schedule_id) - await handle.pause(note=note) - - def unpause_batch_export( temporal: Client, batch_export_id: str, @@ -248,20 +246,10 @@ def unpause_batch_export( backfill_export(temporal, batch_export_id, start_at, end_at) -@async_to_sync -async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: - """Unpause a Temporal Schedule.""" - handle = temporal.get_schedule_handle(schedule_id) - await handle.unpause(note=note) - - -@async_to_sync -async def delete_schedule(temporal: Client, schedule_id: str) -> None: +def batch_export_delete_schedule(temporal: Client, schedule_id: str) -> None: """Delete a Temporal Schedule.""" - handle = temporal.get_schedule_handle(schedule_id) - try: - await handle.delete() + delete_schedule(temporal, schedule_id) except temporalio.service.RPCError as e: if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: raise BatchExportServiceScheduleNotFound(schedule_id) @@ -269,13 +257,6 @@ async def delete_schedule(temporal: Client, schedule_id: str) -> None: raise BatchExportServiceRPCError() from e -@async_to_sync -async def describe_schedule(temporal: Client, schedule_id: str): - """Describe a Temporal Schedule.""" - handle = temporal.get_schedule_handle(schedule_id) - return await handle.describe() - - @async_to_sync async def cancel_running_batch_export_backfill(temporal: Client, workflow_id: str) -> None: """Delete a running BatchExportBackfill. @@ -438,29 +419,6 @@ def sync_batch_export(batch_export: BatchExport, created: bool): return batch_export -@async_to_sync -async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False): - """Create a Temporal Schedule.""" - return await temporal.create_schedule( - id=id, - schedule=schedule, - trigger_immediately=trigger_immediately, - ) - - -@async_to_sync -async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: - """Update a Temporal Schedule.""" - handle = temporal.get_schedule_handle(id) - - async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: - return ScheduleUpdate(schedule=schedule) - - return await handle.update( - updater=updater, - ) - - def create_batch_export_backfill( batch_export_id: UUID, team_id: int, diff --git a/posthog/management/commands/create_batch_export_from_app.py b/posthog/management/commands/create_batch_export_from_app.py index 9e6202427d701..b1939656a4fd6 100644 --- a/posthog/management/commands/create_batch_export_from_app.py +++ b/posthog/management/commands/create_batch_export_from_app.py @@ -7,7 +7,7 @@ from posthog.batch_exports.models import BatchExport, BatchExportDestination from posthog.batch_exports.service import backfill_export, sync_batch_export from posthog.models.plugin import PluginAttachment, PluginConfig -from posthog.temporal.client import sync_connect +from posthog.temporal.common.client import sync_connect class Command(BaseCommand): diff --git a/posthog/management/commands/execute_temporal_workflow.py b/posthog/management/commands/execute_temporal_workflow.py index df9f5d993fc07..e59574969072c 100644 --- a/posthog/management/commands/execute_temporal_workflow.py +++ b/posthog/management/commands/execute_temporal_workflow.py @@ -6,8 +6,8 @@ from django.core.management.base import BaseCommand from temporalio.common import RetryPolicy, WorkflowIDReusePolicy -from posthog.temporal.client import connect -from posthog.temporal.workflows import WORKFLOWS +from posthog.temporal.common.client import connect +from posthog.temporal.batch_exports import WORKFLOWS class Command(BaseCommand): diff --git a/posthog/management/commands/start_temporal_worker.py b/posthog/management/commands/start_temporal_worker.py index 6d3c3bfc5d069..3087e4cacc4a1 100644 --- a/posthog/management/commands/start_temporal_worker.py +++ b/posthog/management/commands/start_temporal_worker.py @@ -8,7 +8,15 @@ from django.conf import settings from django.core.management.base import BaseCommand -from posthog.temporal.worker import start_worker +from posthog.temporal.common.worker import start_worker +from posthog.temporal.batch_exports import WORKFLOWS as BATCH_EXPORTS_WORKFLOWS, ACTIVITIES as BATCH_EXPORTS_ACTIVITIES + +WORKFLOWS_DICT = { + "no-sandbox-python-django": BATCH_EXPORTS_WORKFLOWS, +} +ACTIVITIES_DICT = { + "no-sandbox-python-django": BATCH_EXPORTS_ACTIVITIES, +} class Command(BaseCommand): @@ -65,6 +73,12 @@ def handle(self, *args, **options): client_cert = options.get("client_cert", None) client_key = options.get("client_key", None) + try: + workflows = WORKFLOWS_DICT[task_queue] + activities = ACTIVITIES_DICT[task_queue] + except KeyError: + raise ValueError(f"Task queue {task_queue} not found in WORKFLOWS_DICT or ACTIVITIES_DICT") + if options["client_key"]: options["client_key"] = "--SECRET--" logging.info(f"Starting Temporal Worker with options: {options}") @@ -83,5 +97,7 @@ def handle(self, *args, **options): server_root_ca_cert=server_root_ca_cert, client_cert=client_cert, client_key=client_key, + workflows=workflows, + activities=activities, ) ) diff --git a/posthog/management/commands/test/test_create_batch_export_from_app.py b/posthog/management/commands/test/test_create_batch_export_from_app.py index 9f1b8be67f683..a5c8fffc5f4d4 100644 --- a/posthog/management/commands/test/test_create_batch_export_from_app.py +++ b/posthog/management/commands/test/test_create_batch_export_from_app.py @@ -18,8 +18,8 @@ map_plugin_config_to_destination, ) from posthog.models import Plugin, PluginAttachment, PluginConfig -from posthog.temporal.client import sync_connect -from posthog.temporal.codec import EncryptionCodec +from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.codec import EncryptionCodec @pytest.fixture diff --git a/posthog/models/team/util.py b/posthog/models/team/util.py index ccaa249c559bf..a21b75ab80384 100644 --- a/posthog/models/team/util.py +++ b/posthog/models/team/util.py @@ -1,8 +1,8 @@ from datetime import timedelta from typing import Any, List -from posthog.temporal.client import sync_connect -from posthog.batch_exports.service import delete_schedule +from posthog.temporal.common.client import sync_connect +from posthog.batch_exports.service import batch_export_delete_schedule from posthog.cache_utils import cache_for from posthog.models.async_migration import is_async_migration_complete @@ -44,7 +44,7 @@ def delete_batch_exports(team_ids: List[int]): batch_export.delete() batch_export.destination.delete() - delete_schedule(temporal, str(schedule_id)) + batch_export_delete_schedule(temporal, str(schedule_id)) can_enable_actor_on_events = False diff --git a/posthog/temporal/__init__.py b/posthog/temporal/__init__.py index e69de29bb2d1d..8b137891791fe 100644 --- a/posthog/temporal/__init__.py +++ b/posthog/temporal/__init__.py @@ -0,0 +1 @@ + diff --git a/posthog/temporal/workflows/__init__.py b/posthog/temporal/batch_exports/__init__.py similarity index 71% rename from posthog/temporal/workflows/__init__.py rename to posthog/temporal/batch_exports/__init__.py index 4b27b56500be6..d3b41be4bcd32 100644 --- a/posthog/temporal/workflows/__init__.py +++ b/posthog/temporal/batch_exports/__init__.py @@ -1,38 +1,38 @@ from typing import Callable, Sequence -from posthog.temporal.workflows.backfill_batch_export import ( +from posthog.temporal.batch_exports.backfill_batch_export import ( BackfillBatchExportWorkflow, backfill_schedule, get_schedule_frequency, ) -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_batch_export_backfill_model, create_export_run, update_batch_export_backfill_model_status, update_export_run_status, ) -from posthog.temporal.workflows.bigquery_batch_export import ( +from posthog.temporal.batch_exports.bigquery_batch_export import ( BigQueryBatchExportWorkflow, insert_into_bigquery_activity, ) -from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity -from posthog.temporal.workflows.postgres_batch_export import ( +from posthog.temporal.batch_exports.noop import NoOpWorkflow, noop_activity +from posthog.temporal.batch_exports.postgres_batch_export import ( PostgresBatchExportWorkflow, insert_into_postgres_activity, ) -from posthog.temporal.workflows.redshift_batch_export import ( +from posthog.temporal.batch_exports.redshift_batch_export import ( RedshiftBatchExportWorkflow, insert_into_redshift_activity, ) -from posthog.temporal.workflows.s3_batch_export import ( +from posthog.temporal.batch_exports.s3_batch_export import ( S3BatchExportWorkflow, insert_into_s3_activity, ) -from posthog.temporal.workflows.snowflake_batch_export import ( +from posthog.temporal.batch_exports.snowflake_batch_export import ( SnowflakeBatchExportWorkflow, insert_into_snowflake_activity, ) -from posthog.temporal.workflows.squash_person_overrides import * +from posthog.temporal.batch_exports.squash_person_overrides import * WORKFLOWS = [ BackfillBatchExportWorkflow, diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/batch_exports/backfill_batch_export.py similarity index 98% rename from posthog/temporal/workflows/backfill_batch_export.py rename to posthog/temporal/batch_exports/backfill_batch_export.py index f5a45b42effd4..0dbc1fcf13d0c 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/batch_exports/backfill_batch_export.py @@ -14,9 +14,9 @@ from django.conf import settings from posthog.batch_exports.service import BackfillBatchExportInputs -from posthog.temporal.client import connect -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.common.client import connect +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( CreateBatchExportBackfillInputs, UpdateBatchExportBackfillStatusInputs, create_batch_export_backfill_model, diff --git a/posthog/temporal/workflows/base.py b/posthog/temporal/batch_exports/base.py similarity index 100% rename from posthog/temporal/workflows/base.py rename to posthog/temporal/batch_exports/base.py diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py similarity index 99% rename from posthog/temporal/workflows/batch_exports.py rename to posthog/temporal/batch_exports/batch_exports.py index 5b71b87550892..9d9112b88a29c 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -21,8 +21,8 @@ update_batch_export_backfill_status, update_batch_export_run_status, ) -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_export_finished_metric, get_export_started_metric +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_export_finished_metric, get_export_started_metric SELECT_QUERY_TEMPLATE = Template( """ diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py similarity index 96% rename from posthog/temporal/workflows/bigquery_batch_export.py rename to posthog/temporal/batch_exports/bigquery_batch_export.py index 759b755427f2d..f4055de5760e5 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -11,12 +11,12 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import BigQueryBatchExportInputs -from posthog.temporal.utils import ( +from posthog.temporal.common.utils import ( HeartbeatDetails, should_resume_from_activity_heartbeat, ) -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, @@ -26,9 +26,9 @@ get_results_iterator, get_rows_count, ) -from posthog.temporal.workflows.clickhouse import get_client -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric async def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client): diff --git a/posthog/temporal/workflows/clickhouse.py b/posthog/temporal/batch_exports/clickhouse.py similarity index 100% rename from posthog/temporal/workflows/clickhouse.py rename to posthog/temporal/batch_exports/clickhouse.py diff --git a/posthog/temporal/workflows/logger.py b/posthog/temporal/batch_exports/logger.py similarity index 100% rename from posthog/temporal/workflows/logger.py rename to posthog/temporal/batch_exports/logger.py diff --git a/posthog/temporal/workflows/metrics.py b/posthog/temporal/batch_exports/metrics.py similarity index 100% rename from posthog/temporal/workflows/metrics.py rename to posthog/temporal/batch_exports/metrics.py diff --git a/posthog/temporal/workflows/noop.py b/posthog/temporal/batch_exports/noop.py similarity index 95% rename from posthog/temporal/workflows/noop.py rename to posthog/temporal/batch_exports/noop.py index 20fa2f1ac097e..77bc19c455b51 100644 --- a/posthog/temporal/workflows/noop.py +++ b/posthog/temporal/batch_exports/noop.py @@ -7,7 +7,7 @@ from temporalio import activity, workflow from posthog.batch_exports.service import NoOpInputs -from posthog.temporal.workflows.base import PostHogWorkflow +from posthog.temporal.batch_exports.base import PostHogWorkflow @dataclass diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py similarity index 97% rename from posthog/temporal/workflows/postgres_batch_export.py rename to posthog/temporal/batch_exports/postgres_batch_export.py index 025c29c6421ea..957ea42daad39 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -12,8 +12,8 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import PostgresBatchExportInputs -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, @@ -23,9 +23,9 @@ get_results_iterator, get_rows_count, ) -from posthog.temporal.workflows.clickhouse import get_client -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric @contextlib.asynccontextmanager diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py similarity index 97% rename from posthog/temporal/workflows/redshift_batch_export.py rename to posthog/temporal/batch_exports/redshift_batch_export.py index fd2ba4c9e9193..57333351c1c12 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -12,8 +12,8 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import RedshiftBatchExportInputs -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, create_export_run, @@ -22,10 +22,10 @@ get_results_iterator, get_rows_count, ) -from posthog.temporal.workflows.clickhouse import get_client -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_rows_exported_metric -from posthog.temporal.workflows.postgres_batch_export import ( +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_rows_exported_metric +from posthog.temporal.batch_exports.postgres_batch_export import ( PostgresInsertInputs, create_table_in_postgres, postgres_connection, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py similarity index 98% rename from posthog/temporal/workflows/s3_batch_export.py rename to posthog/temporal/batch_exports/s3_batch_export.py index 42e66f10d2ae3..f4a3e8d86238c 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -13,8 +13,8 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import S3BatchExportInputs -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, @@ -24,9 +24,9 @@ get_results_iterator, get_rows_count, ) -from posthog.temporal.workflows.clickhouse import get_client -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric def get_allowed_template_variables(inputs) -> dict[str, str]: diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py similarity index 97% rename from posthog/temporal/workflows/snowflake_batch_export.py rename to posthog/temporal/batch_exports/snowflake_batch_export.py index b216f20af0412..03e59156b5c44 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -14,14 +14,14 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.service import SnowflakeBatchExportInputs -from posthog.temporal.utils import ( +from posthog.temporal.common.utils import ( HeartbeatDetails, HeartbeatParseError, NotEnoughHeartbeatValuesError, should_resume_from_activity_heartbeat, ) -from posthog.temporal.workflows.base import PostHogWorkflow -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, @@ -31,9 +31,9 @@ get_results_iterator, get_rows_count, ) -from posthog.temporal.workflows.clickhouse import get_client -from posthog.temporal.workflows.logger import bind_batch_exports_logger -from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.logger import bind_batch_exports_logger +from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric class SnowflakeFileNotUploadedError(Exception): diff --git a/posthog/temporal/workflows/squash_person_overrides.py b/posthog/temporal/batch_exports/squash_person_overrides.py similarity index 99% rename from posthog/temporal/workflows/squash_person_overrides.py rename to posthog/temporal/batch_exports/squash_person_overrides.py index 1386964339589..943dd91192e21 100644 --- a/posthog/temporal/workflows/squash_person_overrides.py +++ b/posthog/temporal/batch_exports/squash_person_overrides.py @@ -11,7 +11,7 @@ from temporalio.common import RetryPolicy from posthog.clickhouse.client.execute import sync_execute -from posthog.temporal.workflows.base import PostHogWorkflow +from posthog.temporal.batch_exports.base import PostHogWorkflow EPOCH = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc) diff --git a/posthog/temporal/common/__init__.py b/posthog/temporal/common/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/posthog/temporal/client.py b/posthog/temporal/common/client.py similarity index 95% rename from posthog/temporal/client.py rename to posthog/temporal/common/client.py index df6dc1cc81e74..5794115c5cd18 100644 --- a/posthog/temporal/client.py +++ b/posthog/temporal/common/client.py @@ -5,7 +5,7 @@ from django.conf import settings from temporalio.client import Client, TLSConfig -from posthog.temporal.codec import EncryptionCodec +from posthog.temporal.common.codec import EncryptionCodec async def connect( diff --git a/posthog/temporal/codec.py b/posthog/temporal/common/codec.py similarity index 100% rename from posthog/temporal/codec.py rename to posthog/temporal/common/codec.py diff --git a/posthog/temporal/common/schedule.py b/posthog/temporal/common/schedule.py new file mode 100644 index 0000000000000..b827df24dbbc1 --- /dev/null +++ b/posthog/temporal/common/schedule.py @@ -0,0 +1,65 @@ +from asgiref.sync import async_to_sync +from temporalio.client import ( + Client, + Schedule, + ScheduleUpdate, + ScheduleUpdateInput, +) + + +@async_to_sync +async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False): + """Create a Temporal Schedule.""" + return await temporal.create_schedule( + id=id, + schedule=schedule, + trigger_immediately=trigger_immediately, + ) + + +@async_to_sync +async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: + """Update a Temporal Schedule.""" + handle = temporal.get_schedule_handle(id) + + async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: + return ScheduleUpdate(schedule=schedule) + + return await handle.update( + updater=updater, + ) + + +@async_to_sync +async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: + """Unpause a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.unpause(note=note) + + +@async_to_sync +async def delete_schedule(temporal: Client, schedule_id: str) -> None: + """Delete a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.delete() + + +@async_to_sync +async def describe_schedule(temporal: Client, schedule_id: str): + """Describe a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + return await handle.describe() + + +@async_to_sync +async def pause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: + """Pause a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.pause(note=note) + + +@async_to_sync +async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: + """Pause a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.trigger() diff --git a/posthog/temporal/sentry.py b/posthog/temporal/common/sentry.py similarity index 100% rename from posthog/temporal/sentry.py rename to posthog/temporal/common/sentry.py diff --git a/posthog/temporal/utils.py b/posthog/temporal/common/utils.py similarity index 100% rename from posthog/temporal/utils.py rename to posthog/temporal/common/utils.py diff --git a/posthog/temporal/worker.py b/posthog/temporal/common/worker.py similarity index 84% rename from posthog/temporal/worker.py rename to posthog/temporal/common/worker.py index 05666379e45c6..f36d820cee6b1 100644 --- a/posthog/temporal/worker.py +++ b/posthog/temporal/common/worker.py @@ -5,9 +5,8 @@ from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.temporal.client import connect -from posthog.temporal.workflows import ACTIVITIES, WORKFLOWS -from posthog.temporal.sentry import SentryInterceptor +from posthog.temporal.common.client import connect +from posthog.temporal.common.sentry import SentryInterceptor async def start_worker( @@ -16,6 +15,8 @@ async def start_worker( metrics_port, namespace, task_queue, + workflows, + activities, server_root_ca_cert=None, client_cert=None, client_key=None, @@ -33,8 +34,8 @@ async def start_worker( worker = Worker( client, task_queue=task_queue, - workflows=WORKFLOWS, - activities=ACTIVITIES, + workflows=workflows, + activities=activities, workflow_runner=UnsandboxedWorkflowRunner(), graceful_shutdown_timeout=timedelta(minutes=5), interceptors=[SentryInterceptor()], diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py index 37790bcd52711..6b1abd92f077c 100644 --- a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -16,7 +16,7 @@ adelete_batch_export, afetch_batch_export_backfills, ) -from posthog.temporal.workflows.backfill_batch_export import ( +from posthog.temporal.batch_exports.backfill_batch_export import ( BackfillBatchExportInputs, BackfillBatchExportWorkflow, BackfillScheduleInputs, diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 2f31ac871f60e..f1ef5236202e9 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -12,7 +12,7 @@ to_isoformat, ) from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( BatchExportTemporaryFile, get_data_interval, get_results_iterator, diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index d6696407c60e2..13adab3ffe6d5 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -19,11 +19,11 @@ from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_export_run, update_export_run_status, ) -from posthog.temporal.workflows.bigquery_batch_export import ( +from posthog.temporal.batch_exports.bigquery_batch_export import ( BigQueryBatchExportInputs, BigQueryBatchExportWorkflow, BigQueryInsertInputs, diff --git a/posthog/temporal/tests/batch_exports/test_logger.py b/posthog/temporal/tests/batch_exports/test_logger.py index b750281b12290..4843939eb783c 100644 --- a/posthog/temporal/tests/batch_exports/test_logger.py +++ b/posthog/temporal/tests/batch_exports/test_logger.py @@ -23,7 +23,7 @@ TRUNCATE_LOG_ENTRIES_TABLE_SQL, ) from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES -from posthog.temporal.workflows.logger import BACKGROUND_LOGGER_TASKS, bind_batch_exports_logger, configure_logger +from posthog.temporal.batch_exports.logger import BACKGROUND_LOGGER_TASKS, bind_batch_exports_logger, configure_logger pytestmark = pytest.mark.asyncio diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 6a70e9f2eb74c..1a0233bdd027d 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -18,11 +18,11 @@ from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_export_run, update_export_run_status, ) -from posthog.temporal.workflows.postgres_batch_export import ( +from posthog.temporal.batch_exports.postgres_batch_export import ( PostgresBatchExportInputs, PostgresBatchExportWorkflow, PostgresInsertInputs, diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index cea71a458013f..c727c6e771bd3 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -17,11 +17,11 @@ from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_export_run, update_export_run_status, ) -from posthog.temporal.workflows.redshift_batch_export import ( +from posthog.temporal.batch_exports.redshift_batch_export import ( RedshiftBatchExportInputs, RedshiftBatchExportWorkflow, RedshiftInsertInputs, diff --git a/posthog/temporal/tests/batch_exports/test_run_updates.py b/posthog/temporal/tests/batch_exports/test_run_updates.py index 76f7f9bfee14d..fc03d26cbda0a 100644 --- a/posthog/temporal/tests/batch_exports/test_run_updates.py +++ b/posthog/temporal/tests/batch_exports/test_run_updates.py @@ -10,7 +10,7 @@ Organization, Team, ) -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( CreateBatchExportRunInputs, UpdateBatchExportRunStatusInputs, create_export_run, diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 416d9eefdf554..1b584e7e2f3ff 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -25,11 +25,11 @@ from posthog.temporal.tests.utils.datetimes import to_isoformat from posthog.temporal.tests.utils.events import EventValues, generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_export_run, update_export_run_status, ) -from posthog.temporal.workflows.s3_batch_export import ( +from posthog.temporal.batch_exports.s3_batch_export import ( HeartbeatDetails, S3BatchExportInputs, S3BatchExportWorkflow, @@ -280,7 +280,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 ): # 5MB, the minimum for Multipart uploads with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.run(insert_into_s3_activity, insert_inputs) @@ -407,7 +407,7 @@ async def test_s3_export_workflow_with_minio_bucket( ): # We patch the S3 client to return our client that targets MinIO. with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.client.execute_workflow( @@ -541,7 +541,7 @@ async def create_minio_client(*args, **kwargs): workflow_runner=UnsandboxedWorkflowRunner(), ): with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_minio_client, ): await activity_environment.client.execute_workflow( @@ -623,7 +623,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( workflow_runner=UnsandboxedWorkflowRunner(), ): with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.client.execute_workflow( @@ -699,7 +699,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( workflow_runner=UnsandboxedWorkflowRunner(), ): with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.client.execute_workflow( @@ -784,7 +784,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( workflow_runner=UnsandboxedWorkflowRunner(), ): with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.client.execute_workflow( @@ -1136,7 +1136,7 @@ def assert_heartbeat_details(*details): with override_settings(BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): with mock.patch( - "posthog.temporal.workflows.s3_batch_export.aioboto3.Session.client", + "posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session.client", side_effect=create_test_client, ): await activity_environment.run(insert_into_s3_activity, insert_inputs) diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 9561a8bf2ea35..778192a9705fe 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -25,11 +25,11 @@ from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse from posthog.temporal.tests.utils.models import acreate_batch_export, adelete_batch_export, afetch_batch_export_runs -from posthog.temporal.workflows.batch_exports import ( +from posthog.temporal.batch_exports.batch_exports import ( create_export_run, update_export_run_status, ) -from posthog.temporal.workflows.snowflake_batch_export import ( +from posthog.temporal.batch_exports.snowflake_batch_export import ( SnowflakeBatchExportInputs, SnowflakeBatchExportWorkflow, SnowflakeInsertInputs, @@ -404,7 +404,7 @@ async def test_snowflake_export_workflow_exports_events( workflow_runner=UnsandboxedWorkflowRunner(), ): with unittest.mock.patch( - "posthog.temporal.workflows.snowflake_batch_export.snowflake.connector.connect", + "posthog.temporal.batch_exports.snowflake_batch_export.snowflake.connector.connect", ) as mock, override_settings(BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES=1): fake_conn = FakeSnowflakeConnection() mock.return_value = fake_conn @@ -559,7 +559,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, failure_mode="put", **kwargs) with unittest.mock.patch( - "posthog.temporal.workflows.snowflake_batch_export.snowflake.connector.connect", + "posthog.temporal.batch_exports.snowflake_batch_export.snowflake.connector.connect", side_effect=FakeSnowflakeConnectionFailOnPut, ): with pytest.raises(WorkflowFailureError) as exc_info: @@ -625,7 +625,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, failure_mode="copy", **kwargs) with unittest.mock.patch( - "posthog.temporal.workflows.snowflake_batch_export.snowflake.connector.connect", + "posthog.temporal.batch_exports.snowflake_batch_export.snowflake.connector.connect", side_effect=FakeSnowflakeConnectionFailOnCopy, ): with pytest.raises(WorkflowFailureError) as exc_info: diff --git a/posthog/temporal/tests/conftest.py b/posthog/temporal/tests/conftest.py index 4c480989db92b..ad53228157b79 100644 --- a/posthog/temporal/tests/conftest.py +++ b/posthog/temporal/tests/conftest.py @@ -9,8 +9,8 @@ from temporalio.testing import ActivityEnvironment from posthog.models import Organization, Team -from posthog.temporal.client import connect -from posthog.temporal.workflows.clickhouse import ClickHouseClient +from posthog.temporal.common.client import connect +from posthog.temporal.batch_exports.clickhouse import ClickHouseClient @pytest.fixture @@ -101,7 +101,7 @@ async def workflows(request): try: return request.param except AttributeError: - from posthog.temporal.workflows import WORKFLOWS + from posthog.temporal.batch_exports import WORKFLOWS return WORKFLOWS @@ -116,7 +116,7 @@ async def activities(request): try: return request.param except AttributeError: - from posthog.temporal.workflows import ACTIVITIES + from posthog.temporal.batch_exports import ACTIVITIES return ACTIVITIES diff --git a/posthog/temporal/tests/test_clickhouse.py b/posthog/temporal/tests/test_clickhouse.py index d31d6c2b5dd59..53d59b21e285b 100644 --- a/posthog/temporal/tests/test_clickhouse.py +++ b/posthog/temporal/tests/test_clickhouse.py @@ -3,7 +3,7 @@ import pytest -from posthog.temporal.workflows.clickhouse import encode_clickhouse_data +from posthog.temporal.batch_exports.clickhouse import encode_clickhouse_data @pytest.mark.parametrize( diff --git a/posthog/temporal/tests/test_encryption_codec.py b/posthog/temporal/tests/test_encryption_codec.py index a3d9c33ef478d..679ee2a99b462 100644 --- a/posthog/temporal/tests/test_encryption_codec.py +++ b/posthog/temporal/tests/test_encryption_codec.py @@ -10,8 +10,8 @@ from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.batch_exports.service import NoOpInputs -from posthog.temporal.codec import EncryptionCodec -from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity +from posthog.temporal.common.codec import EncryptionCodec +from posthog.temporal.batch_exports.noop import NoOpWorkflow, noop_activity def get_history_event_payloads(event): diff --git a/posthog/temporal/tests/test_squash_person_overrides_workflow.py b/posthog/temporal/tests/test_squash_person_overrides_workflow.py index 8cde17ccb42d1..fa72cb585b6b5 100644 --- a/posthog/temporal/tests/test_squash_person_overrides_workflow.py +++ b/posthog/temporal/tests/test_squash_person_overrides_workflow.py @@ -22,7 +22,7 @@ PERSON_OVERRIDES_CREATE_MATERIALIZED_VIEW_SQL, PERSON_OVERRIDES_CREATE_TABLE_SQL, ) -from posthog.temporal.workflows.squash_person_overrides import ( +from posthog.temporal.batch_exports.squash_person_overrides import ( QueryInputs, SerializablePersonOverrideToDelete, SquashPersonOverridesInputs, diff --git a/posthog/temporal/tests/utils/events.py b/posthog/temporal/tests/utils/events.py index 58142fb21466f..b35250f381412 100644 --- a/posthog/temporal/tests/utils/events.py +++ b/posthog/temporal/tests/utils/events.py @@ -6,7 +6,7 @@ import uuid from posthog.temporal.tests.utils.datetimes import date_range -from posthog.temporal.workflows.clickhouse import ClickHouseClient +from posthog.temporal.batch_exports.clickhouse import ClickHouseClient class EventValues(typing.TypedDict):