Skip to content

Commit

Permalink
fix(data-warehouse): Wrap trigger func (#27079)
Browse files Browse the repository at this point in the history
Gilbert09 authored Dec 19, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d6906a5 commit 177a8f1
Showing 2 changed files with 18 additions and 14 deletions.
30 changes: 17 additions & 13 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@
import posthoganalytics
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import WorkflowAlreadyStartedError


from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2

@@ -144,20 +146,22 @@ def trigger_pipeline_v2(inputs: ExternalDataWorkflowInputs):
logger.debug("Triggering V2 pipeline")

temporal = sync_connect()

asyncio.run(
temporal.start_workflow(
workflow="external-data-job",
arg=dataclasses.asdict(inputs),
id=f"{inputs.external_data_schema_id}-V2",
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2),
retry_policy=RetryPolicy(
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
try:
asyncio.run(
temporal.start_workflow(
workflow="external-data-job",
arg=dataclasses.asdict(inputs),
id=f"{inputs.external_data_schema_id}-V2",
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2),
retry_policy=RetryPolicy(
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
)
)
)
except WorkflowAlreadyStartedError:
pass

logger.debug("V2 pipeline triggered")

Original file line number Diff line number Diff line change
@@ -366,7 +366,7 @@ def sql_table(
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
incremental: Optional[dlt.sources.incremental[Any]] = None,
chunk_size: int = 50000,
chunk_size: int = DEFAULT_CHUNK_SIZE,
backend: TableBackend = "sqlalchemy",
detect_precision_hints: Optional[bool] = None,
reflection_level: Optional[ReflectionLevel] = "full",

0 comments on commit 177a8f1

Please sign in to comment.