From 1b698f5c8ccaade940bed3c434242cfd744fde38 Mon Sep 17 00:00:00 2001 From: eric Date: Fri, 29 Dec 2023 12:08:27 -0500 Subject: [PATCH] add limiting --- .../temporal/data_imports/external_data_job.py | 8 +++++++- .../data_imports/pipelines/hubspot/__init__.py | 16 +++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 232cbe5e7b75d..3f9f0096a7e76 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -167,7 +167,13 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None: if not hubspot_access_code: hubspot_access_code = refresh_access_token(refresh_token) - source = hubspot(api_key=hubspot_access_code, refresh_token=refresh_token, endpoints=tuple(inputs.schemas)) + source = hubspot( + api_key=hubspot_access_code, + refresh_token=refresh_token, + job_id=str(model.id), + team_id=inputs.team_id, + endpoints=tuple(inputs.schemas), + ) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/data_imports/pipelines/hubspot/__init__.py b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py index 656506af7a65a..0811e8851edd1 100644 --- a/posthog/temporal/data_imports/pipelines/hubspot/__init__.py +++ b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py @@ -28,6 +28,7 @@ import dlt from dlt.common.typing import TDataItems from dlt.sources import DltResource +from posthog.temporal.data_imports.pipelines.helpers import limit_paginated_generator from .helpers import ( fetch_data, @@ -49,6 +50,8 @@ def hubspot( api_key: str, refresh_token: str, + job_id: str, + team_id: int, endpoints: Sequence[str] = ("companies", "contacts", "deals", "tickets", "quotes"), include_history: bool = False, ) -> Sequence[DltResource]: @@ -84,15 +87,18 @@ def hubspot( name=endpoint, write_disposition="append", )( - OBJECT_TYPE_SINGULAR[endpoint], - api_key, - refresh_token, - include_history, - DEFAULT_PROPS[endpoint], + object_type=OBJECT_TYPE_SINGULAR[endpoint], + api_key=api_key, + refresh_token=refresh_token, + include_history=include_history, + props=DEFAULT_PROPS[endpoint], include_custom_props=True, + job_id=job_id, + team_id=team_id, ) +@limit_paginated_generator def crm_objects( object_type: str, api_key: str,