diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index f83e7f9d87249..59508a2ee6f25 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -3,10 +3,13 @@ import datetime as dt import json import re +import threading +import time from django.conf import settings from django.db import close_old_connections import posthoganalytics +import psutil from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from temporalio.exceptions import WorkflowAlreadyStartedError @@ -177,6 +180,22 @@ def create_source_templates(inputs: CreateSourceTemplateInputs) -> None: create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id) +def log_memory_usage(): + process = psutil.Process() + logger = bind_temporal_worker_logger_sync(team_id=0) + + while True: + memory_info = process.memory_info() + logger.info(f"Memory Usage: RSS = {memory_info.rss / (1024 * 1024):.2f} MB") + + time.sleep(10) # Log every 10 seconds + + +if settings.TEMPORAL_TASK_QUEUE == DATA_WAREHOUSE_TASK_QUEUE_V2: + thread = threading.Thread(target=log_memory_usage, daemon=True) + thread.start() + + # TODO: update retry policies @workflow.defn(name="external-data-job") class ExternalDataJobWorkflow(PostHogWorkflow):