From eabb30c495004d13fc59a807f10a19fe3fb7d9d3 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Tue, 23 Jul 2024 18:25:05 +0100 Subject: [PATCH] chore(data-warehouse): Use the common temporal heartbeater (#23915) --- .../temporal/data_imports/external_data_job.py | 2 +- .../workflow_activities/import_data.py | 15 ++------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 4fc6e10200866..058745c2753ec 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -177,7 +177,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): await workflow.execute_activity( import_data_activity, job_inputs, - heartbeat_timeout=dt.timedelta(minutes=1), + heartbeat_timeout=dt.timedelta(minutes=2), **timeout_params, ) # type: ignore diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 2cba1697ef44e..9849339e785c7 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -4,6 +4,7 @@ from temporalio import activity +from posthog.temporal.common.heartbeat import Heartbeater from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs @@ -13,7 +14,6 @@ get_external_data_job, ) from posthog.temporal.common.logger import bind_temporal_worker_logger -import asyncio from structlog.typing import FilteringBoundLogger from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id from posthog.warehouse.models.ssh_tunnel import SSHTunnel @@ -250,15 +250,7 @@ async def _run( schema: ExternalDataSchema, reset_pipeline: bool, ): - # Temp background heartbeat for now - async def heartbeat() -> None: - while True: - await asyncio.sleep(10) - activity.heartbeat() - - heartbeat_task = asyncio.create_task(heartbeat()) - - try: + async with Heartbeater(): table_row_counts = await DataImportPipeline( job_inputs, source, logger, reset_pipeline, schema.is_incremental ).run() @@ -266,6 +258,3 @@ async def heartbeat() -> None: await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced) await aremove_reset_pipeline(inputs.source_id) - finally: - heartbeat_task.cancel() - await asyncio.wait([heartbeat_task])