Skip to content

Commit

Permalink
add limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE committed Dec 29, 2023
1 parent 0cce8d3 commit 1b698f5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
8 changes: 7 additions & 1 deletion posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,13 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None:
if not hubspot_access_code:
hubspot_access_code = refresh_access_token(refresh_token)

source = hubspot(api_key=hubspot_access_code, refresh_token=refresh_token, endpoints=tuple(inputs.schemas))
source = hubspot(
api_key=hubspot_access_code,
refresh_token=refresh_token,
job_id=str(model.id),
team_id=inputs.team_id,
endpoints=tuple(inputs.schemas),
)
else:
raise ValueError(f"Source type {model.pipeline.source_type} not supported")

Expand Down
16 changes: 11 additions & 5 deletions posthog/temporal/data_imports/pipelines/hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import dlt
from dlt.common.typing import TDataItems
from dlt.sources import DltResource
from posthog.temporal.data_imports.pipelines.helpers import limit_paginated_generator

from .helpers import (
fetch_data,
Expand All @@ -49,6 +50,8 @@
def hubspot(
api_key: str,
refresh_token: str,
job_id: str,
team_id: int,
endpoints: Sequence[str] = ("companies", "contacts", "deals", "tickets", "quotes"),
include_history: bool = False,
) -> Sequence[DltResource]:
Expand Down Expand Up @@ -84,15 +87,18 @@ def hubspot(
name=endpoint,
write_disposition="append",
)(
OBJECT_TYPE_SINGULAR[endpoint],
api_key,
refresh_token,
include_history,
DEFAULT_PROPS[endpoint],
object_type=OBJECT_TYPE_SINGULAR[endpoint],
api_key=api_key,
refresh_token=refresh_token,
include_history=include_history,
props=DEFAULT_PROPS[endpoint],
include_custom_props=True,
job_id=job_id,
team_id=team_id,
)


@limit_paginated_generator
def crm_objects(
object_type: str,
api_key: str,
Expand Down

0 comments on commit 1b698f5

Please sign in to comment.