From e4d36968f8ff006664d68b704c678075dc5de5e4 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Sat, 26 Oct 2024 08:16:23 +0100 Subject: [PATCH] fix(data-warehouse): Use a seperate thread pool executor for warehouse pipeline (#25831) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- posthog/temporal/data_imports/pipelines/pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 45aeb1860b29c..f0d9961f4e665 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import Literal from uuid import UUID @@ -253,7 +254,10 @@ def _run(self) -> dict[str, int]: async def run(self) -> dict[str, int]: try: - return await asyncio.to_thread(self._run) + # Use a dedicated thread pool to not interfere with the heartbeater thread + with ThreadPoolExecutor(max_workers=5) as pipeline_executor: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(pipeline_executor, self._run) except PipelineStepFailed as e: self.logger.exception(f"Data import failed for endpoint with exception {e}", exc_info=e) raise