Skip to content

Commit

Permalink
chore(data-warehouse): Only execute one workflow at a time (#27085)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Dec 20, 2024
1 parent 571e20b commit 5b7a31d
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions posthog/temporal/common/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2
from posthog.temporal.common.client import connect
from posthog.temporal.common.sentry import SentryInterceptor

Expand Down Expand Up @@ -34,18 +35,35 @@ async def start_worker(
client_key,
runtime=runtime,
)
worker = Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
max_concurrent_activities=max_concurrent_activities or 50,
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
)
if task_queue == DATA_WAREHOUSE_TASK_QUEUE_V2:
worker = Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
# Only run one workflow at a time
max_concurrent_activities=1,
max_concurrent_workflow_task_polls=1,
max_concurrent_workflow_tasks=1,
max_cached_workflows=0,
)
else:
worker = Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
workflow_runner=UnsandboxedWorkflowRunner(),
graceful_shutdown_timeout=timedelta(minutes=5),
interceptors=[SentryInterceptor()],
activity_executor=ThreadPoolExecutor(max_workers=max_concurrent_activities or 50),
max_concurrent_activities=max_concurrent_activities or 50,
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
)

# catch the TERM signal, and stop the worker gracefully
# https://github.com/temporalio/sdk-python#worker-shutdown
Expand Down

0 comments on commit 5b7a31d

Please sign in to comment.