diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 5c8634cb61a6e..49b94f268279a 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -194,9 +194,8 @@ async def run(self, inputs: ExternalDataWorkflowInputs): await workflow.execute_activity( run_external_data_job, job_inputs, - start_to_close_timeout=dt.timedelta(minutes=120), - retry_policy=RetryPolicy(maximum_attempts=10), - heartbeat_timeout=dt.timedelta(seconds=60), + start_to_close_timeout=dt.timedelta(minutes=90), + retry_policy=RetryPolicy(maximum_attempts=5), ) # check schema first diff --git a/posthog/temporal/data_imports/pipelines/stripe/helpers.py b/posthog/temporal/data_imports/pipelines/stripe/helpers.py index 4f5ee99b40d9a..2e9bba272d5fc 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/helpers.py +++ b/posthog/temporal/data_imports/pipelines/stripe/helpers.py @@ -98,6 +98,12 @@ def stripe_source( stripe_pagination, name=endpoint, write_disposition="append", + columns={ + "metadata": { + "data_type": "complex", + "nullable": True, + } + }, )( api_key=api_key, endpoint=endpoint, diff --git a/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py b/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py index 95217b810298b..b99a14f438bce 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py +++ b/posthog/temporal/data_imports/pipelines/stripe/stripe_pipeline.py @@ -12,12 +12,6 @@ from posthog.temporal.common.logger import bind_temporal_worker_logger import os -from temporalio import activity -from posthog.temporal.common.utils import ( - DataImportHeartbeatDetails, - should_resume_from_activity_heartbeat, -) -import asyncio @dataclass @@ -73,34 +67,13 @@ async def run_stripe_pipeline(inputs: StripeJobInputs) -> None: logger.info(f"No schemas found for source id {inputs.source_id}") return - should_resume, details = await should_resume_from_activity_heartbeat(activity, DataImportHeartbeatDetails, logger) - - if should_resume and details: - schemas = schemas[schemas.index(details.endpoint) :] - logger.info(f"Resuming from {details.endpoint} with cursor {details.cursor}") - - endpoint = schemas[0] - cursor = None - - async def worker_shutdown_handler(): - """Handle the Worker shutting down by heart-beating our latest status.""" - await activity.wait_for_worker_shutdown() - activity.heartbeat(endpoint, cursor) - - asyncio.create_task(worker_shutdown_handler()) - for endpoint in schemas: - if should_resume and details and endpoint == details.endpoint: - starting_after = details.cursor - else: - starting_after = None - pipeline = create_pipeline(inputs) try: - source = stripe_source(inputs.stripe_secret_key, endpoint, starting_after=starting_after) + source = stripe_source(inputs.stripe_secret_key, endpoint) pipeline.run(source, table_name=endpoint.lower(), loader_file_format="parquet") except PipelineStepFailed: - logger.error(f"Data import failed for endpoint {endpoint} with cursor {cursor}") + logger.error(f"Data import failed for endpoint {endpoint}") raise