From 94927f657bc7d5c2b814012a1929f32b48934206 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 19 Dec 2024 21:19:31 +0100 Subject: [PATCH] Only execute one workflow at a time --- posthog/temporal/common/worker.py | 42 ++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/posthog/temporal/common/worker.py b/posthog/temporal/common/worker.py index 2c78aae3f4a8c..f5db3d6b0417d 100644 --- a/posthog/temporal/common/worker.py +++ b/posthog/temporal/common/worker.py @@ -6,6 +6,7 @@ from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2 from posthog.temporal.common.client import connect from posthog.temporal.common.sentry import SentryInterceptor @@ -34,18 +35,35 @@ async def start_worker( client_key, runtime=runtime, ) - worker = Worker( - client, - task_queue=task_queue, - workflows=workflows, - activities=activities, - workflow_runner=UnsandboxedWorkflowRunner(), - graceful_shutdown_timeout=timedelta(minutes=5), - interceptors=[SentryInterceptor()], - activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), - max_concurrent_activities=max_concurrent_activities or 50, - max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, - ) + if task_queue == DATA_WAREHOUSE_TASK_QUEUE_V2: + worker = Worker( + client, + task_queue=task_queue, + workflows=workflows, + activities=activities, + workflow_runner=UnsandboxedWorkflowRunner(), + graceful_shutdown_timeout=timedelta(minutes=5), + interceptors=[SentryInterceptor()], + activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), + # Only run one workflow at a time + max_concurrent_activities=1, + max_concurrent_workflow_task_polls=1, + max_concurrent_workflow_tasks=1, + max_cached_workflows=0, + ) + else: + worker = Worker( + client, + task_queue=task_queue, + workflows=workflows, + activities=activities, + workflow_runner=UnsandboxedWorkflowRunner(), + graceful_shutdown_timeout=timedelta(minutes=5), + interceptors=[SentryInterceptor()], + activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50), + max_concurrent_activities=max_concurrent_activities or 50, + max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, + ) # catch the TERM signal, and stop the worker gracefully # https://github.com/temporalio/sdk-python#worker-shutdown