diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 62a1e1bc834ed..f83e7f9d87249 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -9,6 +9,8 @@ import posthoganalytics from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy +from temporalio.exceptions import WorkflowAlreadyStartedError + from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2 @@ -144,20 +146,22 @@ def trigger_pipeline_v2(inputs: ExternalDataWorkflowInputs): logger.debug("Triggering V2 pipeline") temporal = sync_connect() - - asyncio.run( - temporal.start_workflow( - workflow="external-data-job", - arg=dataclasses.asdict(inputs), - id=f"{inputs.external_data_schema_id}-V2", - task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2), - retry_policy=RetryPolicy( - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=1, - non_retryable_error_types=["NondeterminismError"], - ), + try: + asyncio.run( + temporal.start_workflow( + workflow="external-data-job", + arg=dataclasses.asdict(inputs), + id=f"{inputs.external_data_schema_id}-V2", + task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2), + retry_policy=RetryPolicy( + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=1, + non_retryable_error_types=["NondeterminismError"], + ), + ) ) - ) + except WorkflowAlreadyStartedError: + pass logger.debug("V2 pipeline triggered") diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py index de868d62ee51d..0c49b04ba1d1c 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py @@ -366,7 +366,7 @@ def sql_table( schema: Optional[str] = dlt.config.value, metadata: Optional[MetaData] = None, incremental: Optional[dlt.sources.incremental[Any]] = None, - chunk_size: int = 50000, + chunk_size: int = DEFAULT_CHUNK_SIZE, backend: TableBackend = "sqlalchemy", detect_precision_hints: Optional[bool] = None, reflection_level: Optional[ReflectionLevel] = "full",