diff --git a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts index 00572e8a5b3d1..d1b8c615be1a6 100644 --- a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts +++ b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts @@ -103,7 +103,13 @@ export const sourceModalLogic = kea([ return null } - const scopes = ['crm.objects.contacts.read', 'crm.objects.companies.read'] + const scopes = [ + 'crm.objects.contacts.read', + 'crm.objects.companies.read', + 'crm.objects.deals.read', + 'tickets', + 'crm.objects.quotes.read', + ] const params = new URLSearchParams() params.set('client_id', clientId) @@ -134,6 +140,7 @@ export const sourceModalLogic = kea([ }, }) lemonToast.success(`Oauth successful.`) + actions.loadSources() } catch (e) { lemonToast.error(`Something went wrong. Please try again.`) } finally { diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index cdb218c0cce31..7f278b6a5c4fa 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -155,6 +155,19 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None: source = stripe_source( api_key=stripe_secret_key, endpoints=tuple(inputs.schemas), job_id=str(model.id), team_id=inputs.team_id ) + elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: + from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token + from posthog.temporal.data_imports.pipelines.hubspot import hubspot + + hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) + + if not hubspot_access_code: + refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) + if not refresh_token: + raise ValueError(f"Hubspot refresh token not found for job {model.id}") + hubspot_access_code = refresh_access_token(refresh_token) + + source = hubspot(api_key=hubspot_access_code, endpoints=tuple(inputs.schemas)) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/data_imports/pipelines/hubspot/__init__.py b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py new file mode 100644 index 0000000000000..2c0b6f4d63ca0 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py @@ -0,0 +1,287 @@ +""" +This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. + +The source retrieves data from the following endpoints: +- CRM Companies +- CRM Contacts +- CRM Deals +- CRM Tickets +- CRM Products +- CRM Quotes +- Web Analytics Events + +For each endpoint, a resource and transformer function are defined to retrieve data and transform it to a common format. +The resource functions yield the raw data retrieved from the API, while the transformer functions are used to retrieve +additional information from the Web Analytics Events endpoint. + +The source also supports enabling Web Analytics Events for each endpoint by setting the corresponding enable flag to True. + +Example: +To retrieve data from all endpoints, use the following code: + +python + +>>> resources = hubspot(api_key="your_api_key") +""" + +from typing import Any, Dict, List, Literal, Sequence, Iterator +from urllib.parse import quote + +import dlt +from dlt.common import pendulum +from dlt.common.typing import TDataItems, TDataItem +from dlt.sources import DltResource + +from .helpers import ( + fetch_data, + _get_property_names, + fetch_property_history, +) +from .settings import ( + ALL, + CRM_OBJECT_ENDPOINTS, + DEFAULT_COMPANY_PROPS, + DEFAULT_CONTACT_PROPS, + DEFAULT_DEAL_PROPS, + DEFAULT_PRODUCT_PROPS, + DEFAULT_TICKET_PROPS, + DEFAULT_QUOTE_PROPS, + OBJECT_TYPE_PLURAL, + STARTDATE, + WEB_ANALYTICS_EVENTS_ENDPOINT, +) + +THubspotObjectType = Literal["company", "contact", "deal", "ticket", "product", "quote"] + + +@dlt.source(name="hubspot") +def hubspot( + api_key: str = dlt.secrets.value, + endpoints: Sequence[THubspotObjectType] = ("company", "contact", "deal", "ticket", "product", "quote"), + include_history: bool = False, +) -> Sequence[DltResource]: + """ + A DLT source that retrieves data from the HubSpot API using the + specified API key. + + This function retrieves data for several HubSpot API endpoints, + including companies, contacts, deals, tickets, products and web + analytics events. It returns a tuple of Dlt resources, one for + each endpoint. + + Args: + api_key (Optional[str]): + The API key used to authenticate with the HubSpot API. Defaults + to dlt.secrets.value. + include_history (Optional[bool]): + Whether to load history of property changes along with entities. + The history entries are loaded to separate tables. + + Returns: + Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. + + Notes: + This function uses the `fetch_data` function to retrieve data from the + HubSpot CRM API. The API key is passed to `fetch_data` as the + `api_key` argument. + """ + + @dlt.resource(name="companies", write_disposition="replace") + def companies( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_COMPANY_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot companies resource""" + yield from crm_objects( + "company", + api_key, + include_history=include_history, + props=props, + include_custom_props=include_custom_props, + ) + + @dlt.resource(name="contacts", write_disposition="replace") + def contacts( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_CONTACT_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot contacts resource""" + yield from crm_objects( + "contact", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="deals", write_disposition="replace") + def deals( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_DEAL_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot deals resource""" + yield from crm_objects( + "deal", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="tickets", write_disposition="replace") + def tickets( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_TICKET_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot tickets resource""" + yield from crm_objects( + "ticket", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="products", write_disposition="replace") + def products( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_PRODUCT_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot products resource""" + yield from crm_objects( + "product", + api_key, + include_history, + props, + include_custom_props, + ) + + @dlt.resource(name="quotes", write_disposition="replace") + def quotes( + api_key: str = api_key, + include_history: bool = include_history, + props: Sequence[str] = DEFAULT_QUOTE_PROPS, + include_custom_props: bool = True, + ) -> Iterator[TDataItems]: + """Hubspot quotes resource""" + yield from crm_objects( + "quote", + api_key, + include_history, + props, + include_custom_props, + ) + + return companies, contacts + + +def crm_objects( + object_type: str, + api_key: str = dlt.secrets.value, + include_history: bool = False, + props: Sequence[str] = None, + include_custom_props: bool = True, +) -> Iterator[TDataItems]: + """Building blocks for CRM resources.""" + if props == ALL: + props = list(_get_property_names(api_key, object_type)) + + if include_custom_props: + all_props = _get_property_names(api_key, object_type) + custom_props = [prop for prop in all_props if not prop.startswith("hs_")] + props = props + custom_props # type: ignore + + props = ",".join(sorted(list(set(props)))) + + if len(props) > 2000: + raise ValueError( + ( + "Your request to Hubspot is too long to process. " + "Maximum allowed query length is 2000 symbols, while " + f"your list of properties `{props[:200]}`... is {len(props)} " + "symbols long. Use the `props` argument of the resource to " + "set the list of properties to extract from the endpoint." + ) + ) + + params = {"properties": props, "limit": 100} + + yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, params=params) + if include_history: + # Get history separately, as requesting both all properties and history together + # is likely to hit hubspot's URL length limit + for history_entries in fetch_property_history( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + props, + ): + yield dlt.mark.with_table_name( + history_entries, + OBJECT_TYPE_PLURAL[object_type] + "_property_history", + ) + + +@dlt.resource +def hubspot_events_for_objects( + object_type: THubspotObjectType, + object_ids: List[str], + api_key: str = dlt.secrets.value, + start_date: pendulum.DateTime = STARTDATE, +) -> DltResource: + """ + A standalone DLT resources that retrieves web analytics events from the HubSpot API for a particular object type and list of object ids. + + Args: + object_type(THubspotObjectType, required): One of the hubspot object types see definition of THubspotObjectType literal + object_ids: (List[THubspotObjectType], required): List of object ids to track events + api_key (str, optional): The API key used to authenticate with the HubSpot API. Defaults to dlt.secrets.value. + start_date (datetime, optional): The initial date time from which start getting events, default to STARTDATE + + Returns: + incremental dlt resource to track events for objects from the list + """ + + end_date = pendulum.now().isoformat() + name = object_type + "_events" + + def get_web_analytics_events( + occurred_at: dlt.sources.incremental[str], + ) -> Iterator[List[Dict[str, Any]]]: + """ + A helper function that retrieves web analytics events for a given object type from the HubSpot API. + + Args: + object_type (str): The type of object for which to retrieve web analytics events. + + Yields: + dict: A dictionary representing a web analytics event. + """ + for object_id in object_ids: + yield from fetch_data( + WEB_ANALYTICS_EVENTS_ENDPOINT.format( + objectType=object_type, + objectId=object_id, + occurredAfter=quote(occurred_at.last_value), + occurredBefore=quote(end_date), + ), + api_key=api_key, + ) + + return dlt.resource( + get_web_analytics_events, + name=name, + primary_key="id", + write_disposition="append", + selected=True, + table_name=lambda e: name + "_" + str(e["eventType"]), + )(dlt.sources.incremental("occurredAt", initial_value=start_date.isoformat())) diff --git a/posthog/temporal/data_imports/pipelines/hubspot/helpers.py b/posthog/temporal/data_imports/pipelines/hubspot/helpers.py new file mode 100644 index 0000000000000..1570e1897b07b --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/helpers.py @@ -0,0 +1,184 @@ +"""Hubspot source helpers""" + +import urllib.parse +from typing import Iterator, Dict, Any, List, Optional + +from dlt.sources.helpers import requests +from .settings import OBJECT_TYPE_PLURAL + +BASE_URL = "https://api.hubapi.com/" + + +def get_url(endpoint: str) -> str: + """Get absolute hubspot endpoint URL""" + return urllib.parse.urljoin(BASE_URL, endpoint) + + +def _get_headers(api_key: str) -> Dict[str, str]: + """ + Return a dictionary of HTTP headers to use for API requests, including the specified API key. + + Args: + api_key (str): The API key to use for authentication, as a string. + + Returns: + dict: A dictionary of HTTP headers to include in API requests, with the `Authorization` header + set to the specified API key in the format `Bearer {api_key}`. + + """ + # Construct the dictionary of HTTP headers to use for API requests + return dict(authorization=f"Bearer {api_key}") + + +def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]: + for item in objects: + history = item.get("propertiesWithHistory") + if not history: + return + # Yield a flat list of property history entries + for key, changes in history.items(): + if not changes: + continue + for entry in changes: + yield {"object_id": item["id"], "property_name": key, **entry} + + +def fetch_property_history( + endpoint: str, + api_key: str, + props: str, + params: Optional[Dict[str, Any]] = None, +) -> Iterator[List[Dict[str, Any]]]: + """Fetch property history from the given CRM endpoint. + + Args: + endpoint: The endpoint to fetch data from, as a string. + api_key: The API key to use for authentication, as a string. + props: A comma separated list of properties to retrieve the history for + params: Optional dict of query params to include in the request + + Yields: + List of property history entries (dicts) + """ + # Construct the URL and headers for the API request + url = get_url(endpoint) + headers = _get_headers(api_key) + + params = dict(params or {}) + params["propertiesWithHistory"] = props + params["limit"] = 50 + # Make the API request + r = requests.get(url, headers=headers, params=params) + # Parse the API response and yield the properties of each result + + # Parse the response JSON data + _data = r.json() + while _data is not None: + if "results" in _data: + yield list(extract_property_history(_data["results"])) + + # Follow pagination links if they exist + _next = _data.get("paging", {}).get("next", None) + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + _data = r.json() + else: + _data = None + + +def fetch_data(endpoint: str, api_key: str, params: Optional[Dict[str, Any]] = None) -> Iterator[List[Dict[str, Any]]]: + """ + Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result. + For paginated endpoint this function yields item from all pages. + + Args: + endpoint (str): The endpoint to fetch data from, as a string. + api_key (str): The API key to use for authentication, as a string. + params: Optional dict of query params to include in the request + + Yields: + A List of CRM object dicts + + Raises: + requests.exceptions.HTTPError: If the API returns an HTTP error status code. + + Notes: + This function uses the `requests` library to make a GET request to the specified endpoint, with + the API key included in the headers. If the API returns a non-successful HTTP status code (e.g. + 404 Not Found), a `requests.exceptions.HTTPError` exception will be raised. + + The `endpoint` argument should be a relative URL, which will be appended to the base URL for the + API. The `params` argument is used to pass additional query parameters to the request + + This function also includes a retry decorator that will automatically retry the API call up to + 3 times with a 5-second delay between retries, using an exponential backoff strategy. + """ + # Construct the URL and headers for the API request + url = get_url(endpoint) + headers = _get_headers(api_key) + + # Make the API request + r = requests.get(url, headers=headers, params=params) + # Parse the API response and yield the properties of each result + # Parse the response JSON data + _data = r.json() + # Yield the properties of each result in the API response + while _data is not None: + if "results" in _data: + _objects: List[Dict[str, Any]] = [] + for _result in _data["results"]: + _obj = _result.get("properties", _result) + if "id" not in _obj and "id" in _result: + # Move id from properties to top level + _obj["id"] = _result["id"] + if "associations" in _result: + for association in _result["associations"]: + __values = [ + { + "value": _obj["hs_object_id"], + f"{association}_id": __r["id"], + } + for __r in _result["associations"][association]["results"] + ] + + # remove duplicates from list of dicts + __values = [dict(t) for t in {tuple(d.items()) for d in __values}] + + _obj[association] = __values + _objects.append(_obj) + + yield _objects + + # Follow pagination links if they exist + _next = _data.get("paging", {}).get("next", None) + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + _data = r.json() + else: + _data = None + + +def _get_property_names(api_key: str, object_type: str) -> List[str]: + """ + Retrieve property names for a given entity from the HubSpot API. + + Args: + entity: The entity name for which to retrieve property names. + + Returns: + A list of property names. + + Raises: + Exception: If an error occurs during the API request. + """ + properties = [] + endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}" + + for page in fetch_data(endpoint, api_key): + properties.extend([prop["name"] for prop in page]) + + return properties diff --git a/posthog/temporal/data_imports/pipelines/hubspot/settings.py b/posthog/temporal/data_imports/pipelines/hubspot/settings.py new file mode 100644 index 0000000000000..4fe36f7206c86 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/settings.py @@ -0,0 +1,112 @@ +"""Hubspot source settings and constants""" + +from dlt.common import pendulum + +STARTDATE = pendulum.datetime(year=2000, month=1, day=1) + +CONTACT = "contact" +COMPANY = "company" +DEAL = "deal" +PRODUCT = "product" +TICKET = "ticket" +QUOTE = "quote" + +CRM_CONTACTS_ENDPOINT = "/crm/v3/objects/contacts?associations=deals,products,tickets,quotes" +CRM_COMPANIES_ENDPOINT = "/crm/v3/objects/companies?associations=contacts,deals,products,tickets,quotes" +CRM_DEALS_ENDPOINT = "/crm/v3/objects/deals" +CRM_PRODUCTS_ENDPOINT = "/crm/v3/objects/products" +CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" +CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" + +CRM_OBJECT_ENDPOINTS = { + CONTACT: CRM_CONTACTS_ENDPOINT, + COMPANY: CRM_COMPANIES_ENDPOINT, + DEAL: CRM_DEALS_ENDPOINT, + PRODUCT: CRM_PRODUCTS_ENDPOINT, + TICKET: CRM_TICKETS_ENDPOINT, + QUOTE: CRM_QUOTES_ENDPOINT, +} + +WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" + +OBJECT_TYPE_SINGULAR = { + "companies": COMPANY, + "contacts": CONTACT, + "deals": DEAL, + "tickets": TICKET, + "products": PRODUCT, + "quotes": QUOTE, +} + +OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} + + +ENDPOINTS = ( + OBJECT_TYPE_PLURAL[CONTACT], + OBJECT_TYPE_PLURAL[DEAL], + OBJECT_TYPE_PLURAL[COMPANY], + OBJECT_TYPE_PLURAL[TICKET], + OBJECT_TYPE_PLURAL[PRODUCT], + OBJECT_TYPE_PLURAL[QUOTE], +) + +DEFAULT_DEAL_PROPS = [ + "amount", + "closedate", + "createdate", + "dealname", + "dealstage", + "hs_lastmodifieddate", + "hs_object_id", + "pipeline", +] + +DEFAULT_COMPANY_PROPS = [ + "createdate", + "domain", + "hs_lastmodifieddate", + "hs_object_id", + "name", +] + +DEFAULT_CONTACT_PROPS = [ + "createdate", + "email", + "firstname", + "hs_object_id", + "lastmodifieddate", + "lastname", +] + +DEFAULT_TICKET_PROPS = [ + "createdate", + "content", + "hs_lastmodifieddate", + "hs_object_id", + "hs_pipeline", + "hs_pipeline_stage", + "hs_ticket_category", + "hs_ticket_priority", + "subject", +] + +DEFAULT_PRODUCT_PROPS = [ + "createdate", + "description", + "hs_lastmodifieddate", + "hs_object_id", + "name", + "price", +] + +DEFAULT_QUOTE_PROPS = [ + "hs_createdate", + "hs_expiration_date", + "hs_lastmodifieddate", + "hs_object_id", + "hs_public_url_key", + "hs_status", + "hs_title", +] + +ALL = ("ALL",) diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index a62db7d664e40..eaaa431d7aef9 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,4 +1,8 @@ from posthog.warehouse.models import ExternalDataSource -from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS +from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS as STRIPE_ENDPOINTS +from posthog.temporal.data_imports.pipelines.hubspot.settings import ENDPOINTS as HUBSPOT_ENDPOINTS -PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS} +PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { + ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, + ExternalDataSource.Type.HUBSPOT: HUBSPOT_ENDPOINTS, +}