Skip to content

Commit

Permalink
chore(data-warehouse): Use the common temporal heartbeater (#23915)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jul 23, 2024
1 parent cff159c commit eabb30c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 14 deletions.
2 changes: 1 addition & 1 deletion posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
await workflow.execute_activity(
import_data_activity,
job_inputs,
heartbeat_timeout=dt.timedelta(minutes=1),
heartbeat_timeout=dt.timedelta(minutes=2),
**timeout_params,
) # type: ignore

Expand Down
15 changes: 2 additions & 13 deletions posthog/temporal/data_imports/workflow_activities/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from temporalio import activity

from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count

from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
Expand All @@ -13,7 +14,6 @@
get_external_data_job,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
import asyncio
from structlog.typing import FilteringBoundLogger
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
from posthog.warehouse.models.ssh_tunnel import SSHTunnel
Expand Down Expand Up @@ -250,22 +250,11 @@ async def _run(
schema: ExternalDataSchema,
reset_pipeline: bool,
):
# Temp background heartbeat for now
async def heartbeat() -> None:
while True:
await asyncio.sleep(10)
activity.heartbeat()

heartbeat_task = asyncio.create_task(heartbeat())

try:
async with Heartbeater():
table_row_counts = await DataImportPipeline(
job_inputs, source, logger, reset_pipeline, schema.is_incremental
).run()
total_rows_synced = sum(table_row_counts.values())

await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced)
await aremove_reset_pipeline(inputs.source_id)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])

0 comments on commit eabb30c

Please sign in to comment.