From d59313e006e184407958b6f65e6cbe4f6a830caf Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Tue, 18 Jun 2024 14:48:53 +0100 Subject: [PATCH] feat(data-warehouse): DLT Zendesk conversion (#22876) * Added job cancellation checking into rest source * Remove chunk size * fixed mypy * Fixed test * fixed mypy * Fixed test * WIP * Get the right zendesk tables set as incremental * Final fixes and tests for zendesk * Fixed mypy * Updated incremental table names --- mypy-baseline.txt | 17 +- .../data_imports/pipelines/schemas.py | 8 +- .../pipelines/zendesk/__init__.py | 271 +++++++++++ .../pipelines/zendesk/api_helpers.py | 103 ---- .../pipelines/zendesk/credentials.py | 51 -- .../data_imports/pipelines/zendesk/helpers.py | 451 ------------------ .../pipelines/zendesk/settings.py | 1 + .../pipelines/zendesk/talk_api.py | 115 ----- .../workflow_activities/import_data.py | 21 +- .../temporal/tests/data_imports/conftest.py | 381 +++++++++++++++ .../tests/data_imports/test_end_to_end.py | 153 ++++++ 11 files changed, 829 insertions(+), 743 deletions(-) create mode 100644 posthog/temporal/data_imports/pipelines/zendesk/__init__.py delete mode 100644 posthog/temporal/data_imports/pipelines/zendesk/api_helpers.py delete mode 100644 posthog/temporal/data_imports/pipelines/zendesk/credentials.py delete mode 100644 posthog/temporal/data_imports/pipelines/zendesk/helpers.py delete mode 100644 posthog/temporal/data_imports/pipelines/zendesk/talk_api.py diff --git a/mypy-baseline.txt b/mypy-baseline.txt index e4d8a875c56cd..2cf329590acf4 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment] -posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Dict entry 2 has incompatible type "Literal['auto']": "None"; expected "Literal['json_response', 'header_link', 'auto', 'single_page', 'cursor', 'offset', 'page_number']": "type[BasePaginator]" [dict-item] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "AuthConfigBase") [assignment] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Argument 1 to "get_auth_class" has incompatible type "Literal['bearer', 'api_key', 'http_basic'] | None"; expected "Literal['bearer', 'api_key', 'http_basic']" [arg-type] @@ -550,13 +549,6 @@ posthog/api/test/test_exports.py:0: error: Incompatible types in assignment (exp posthog/api/notebook.py:0: error: Incompatible types in assignment (expression has type "int", variable has type "str | None") [assignment] posthog/warehouse/data_load/validate_schema.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment] posthog/warehouse/api/table.py:0: error: Unsupported target for indexed assignment ("dict[str, str | bool] | str") [index] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type] @@ -668,6 +660,15 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index c6e41bed4fdd6..8089d1204e8b1 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,4 +1,8 @@ -from posthog.temporal.data_imports.pipelines.zendesk.settings import BASE_ENDPOINTS, SUPPORT_ENDPOINTS +from posthog.temporal.data_imports.pipelines.zendesk.settings import ( + BASE_ENDPOINTS, + SUPPORT_ENDPOINTS, + INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS, +) from posthog.warehouse.models import ExternalDataSource from posthog.temporal.data_imports.pipelines.stripe.settings import ( ENDPOINTS as STRIPE_ENDPOINTS, @@ -19,7 +23,7 @@ PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.HUBSPOT: (), - ExternalDataSource.Type.ZENDESK: (), + ExternalDataSource.Type.ZENDESK: ZENDESK_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.POSTGRES: (), ExternalDataSource.Type.SNOWFLAKE: (), } diff --git a/posthog/temporal/data_imports/pipelines/zendesk/__init__.py b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py new file mode 100644 index 0000000000000..47b47f546f367 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py @@ -0,0 +1,271 @@ +import dlt +from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.requests import Response, Request +from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources +from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource +from posthog.warehouse.models.external_table_definitions import get_dlt_mapping_for_external_table + + +def get_resource(name: str, is_incremental: bool) -> EndpointResource: + resources: dict[str, EndpointResource] = { + "brands": { + "name": "brands", + "table_name": "brands", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_brands"), # type: ignore + "endpoint": { + "data_selector": "brands", + "path": "/api/v2/brands", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + "params": { + "page[size]": 100, + }, + }, + }, + "organizations": { + "name": "organizations", + "table_name": "organizations", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_organizations"), # type: ignore + "endpoint": { + "data_selector": "organizations", + "path": "/api/v2/organizations", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + "params": { + "page[size]": 100, + }, + }, + }, + "groups": { + "name": "groups", + "table_name": "groups", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_groups"), # type: ignore + "endpoint": { + "data_selector": "groups", + "path": "/api/v2/groups", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + "params": { + # the parameters below can optionally be configured + # "exclude_deleted": "OPTIONAL_CONFIG", + "page[size]": 100, + }, + }, + }, + "sla_policies": { + "name": "sla_policies", + "table_name": "sla_policies", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_sla_policies"), # type: ignore + "endpoint": { + "data_selector": "sla_policies", + "path": "/api/v2/slas/policies", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + }, + }, + "users": { + "name": "users", + "table_name": "users", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_users"), # type: ignore + "endpoint": { + "data_selector": "users", + "path": "/api/v2/users", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + "params": { + # the parameters below can optionally be configured + # "role": "OPTIONAL_CONFIG", + # "role[]": "OPTIONAL_CONFIG", + # "permission_set": "OPTIONAL_CONFIG", + # "external_id": "OPTIONAL_CONFIG", + "page[size]": 100, + }, + }, + }, + "ticket_fields": { + "name": "ticket_fields", + "table_name": "ticket_fields", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_ticket_fields"), # type: ignore + "endpoint": { + "data_selector": "ticket_fields", + "path": "/api/v2/ticket_fields", + "paginator": { + "type": "json_response", + "next_url_path": "links.next", + }, + "params": { + # the parameters below can optionally be configured + # "locale": "OPTIONAL_CONFIG", + # "creator": "OPTIONAL_CONFIG", + "page[size]": 100, + }, + }, + }, + "ticket_events": { + "name": "ticket_events", + "table_name": "ticket_events", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_ticket_events"), # type: ignore + "endpoint": { + "data_selector": "ticket_events", + "path": "/api/v2/incremental/ticket_events?start_time=0", + "paginator": ZendeskIncrementalEndpointPaginator(), + "params": { + "per_page": 1000, + # Having to use `start_time` in the initial path until incrementality works + # "start_time": 0, + # Incrementality is disabled as we can't access end_time on the root object + # "start_time": { + # "type": "incremental", + # "cursor_path": "end_time", + # "initial_value": 0, # type: ignore + # }, + }, + }, + }, + "tickets": { + "name": "tickets", + "table_name": "tickets", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_tickets"), # type: ignore + "endpoint": { + "data_selector": "tickets", + "path": "/api/v2/incremental/tickets", + "paginator": ZendeskTicketsIncrementalEndpointPaginator(), + "params": { + "per_page": 1000, + "start_time": { + "type": "incremental", + "cursor_path": "generated_timestamp", + "initial_value": 0, # type: ignore + } + if is_incremental + else None, + }, + }, + }, + "ticket_metric_events": { + "name": "ticket_metric_events", + "table_name": "ticket_metric_events", + "primary_key": "id", + "write_disposition": "merge", + "columns": get_dlt_mapping_for_external_table("zendesk_ticket_metric_events"), # type: ignore + "endpoint": { + "data_selector": "ticket_metric_events", + "path": "/api/v2/incremental/ticket_metric_events?start_time=0", + "paginator": ZendeskIncrementalEndpointPaginator(), + "params": { + "per_page": 1000, + # Having to use `start_time` in the initial path until incrementality works + # "start_time": 0, + # Incrementality is disabled as we can't access end_time on the root object + # "start_time": { + # "type": "incremental", + # "cursor_path": "end_time", + # "initial_value": 0, # type: ignore + # }, + }, + }, + }, + } + + return resources[name] + + +class ZendeskTicketsIncrementalEndpointPaginator(BasePaginator): + def update_state(self, response: Response) -> None: + res = response.json() + + self._next_start_time = None + + if not res: + self._has_next_page = False + return + + if not res["end_of_stream"]: + self._has_next_page = True + + last_value_in_response = res["tickets"][-1]["generated_timestamp"] + self._next_start_time = last_value_in_response + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params["start_time"] = self._next_start_time + + +class ZendeskIncrementalEndpointPaginator(BasePaginator): + def update_state(self, response: Response) -> None: + res = response.json() + + self._next_page = None + + if not res: + self._has_next_page = False + return + + if not res["end_of_stream"]: + self._has_next_page = True + + self._next_page = res["next_page"] + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + request.url = self._next_page + + +@dlt.source(max_table_nesting=0) +def zendesk_source( + subdomain: str, + api_key: str, + email_address: str, + endpoint: str, + team_id: int, + job_id: str, + is_incremental: bool = False, +): + config: RESTAPIConfig = { + "client": { + "base_url": f"https://{subdomain}.zendesk.com/", + "auth": { + "type": "http_basic", + "username": f"{email_address}/token", + "password": api_key, + }, + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": "merge", + }, + "resources": [get_resource(endpoint, is_incremental)], + } + + yield from rest_api_resources(config, team_id, job_id) diff --git a/posthog/temporal/data_imports/pipelines/zendesk/api_helpers.py b/posthog/temporal/data_imports/pipelines/zendesk/api_helpers.py deleted file mode 100644 index c478060940d4f..0000000000000 --- a/posthog/temporal/data_imports/pipelines/zendesk/api_helpers.py +++ /dev/null @@ -1,103 +0,0 @@ -from typing import Optional, TypedDict - -from dlt.common import pendulum -from dlt.common.time import ensure_pendulum_datetime -from dlt.common.typing import DictStrAny, DictStrStr, TDataItem - - -class TCustomFieldInfo(TypedDict): - title: str - options: DictStrStr - - -def _parse_date_or_none(value: Optional[str]) -> Optional[pendulum.DateTime]: - if not value: - return None - return ensure_pendulum_datetime(value) - - -def process_ticket( - ticket: DictStrAny, - custom_fields: dict[str, TCustomFieldInfo], - pivot_custom_fields: bool = True, -) -> DictStrAny: - """ - Helper function that processes a ticket object and returns a dictionary of ticket data. - - Args: - ticket: The ticket dict object returned by a Zendesk API call. - custom_fields: A dictionary containing all the custom fields available for tickets. - pivot_custom_fields: A boolean indicating whether to pivot all custom fields or not. - Defaults to True. - - Returns: - DictStrAny: A dictionary containing cleaned data about a ticket. - """ - # Commented out due to how slow this processing code is, and how often it'd break the pipeline. - # to be revisited on whether we want/need this pre-processing and figure out the best way to do it. - - # pivot custom field if indicated as such - # get custom fields - # pivoted_fields = set() - # for custom_field in ticket.get("custom_fields", []): - # if pivot_custom_fields: - # cus_field_id = str(custom_field["id"]) - # field = custom_fields.get(cus_field_id, None) - # if field is None: - # logger.warning( - # "Custom field with ID %s does not exist in fields state. It may have been created after the pipeline run started.", - # cus_field_id, - # ) - # custom_field["ticket_id"] = ticket["id"] - # continue - - # pivoted_fields.add(cus_field_id) - # field_name = field["title"] - # current_value = custom_field["value"] - # options = field["options"] - # # Map dropdown values to labels - # if not current_value or not options: - # ticket[field_name] = current_value - # elif isinstance(current_value, list): # Multiple choice field has a list of values - # ticket[field_name] = [options.get(key, key) for key in current_value] - # else: - # ticket[field_name] = options.get(current_value) - # else: - # custom_field["ticket_id"] = ticket["id"] - # # delete fields that are not needed for pivoting - # if pivot_custom_fields: - # ticket["custom_fields"] = [f for f in ticket.get("custom_fields", []) if str(f["id"]) not in pivoted_fields] - # if not ticket.get("custom_fields"): - # del ticket["custom_fields"] - # del ticket["fields"] - - # modify dates to return datetime objects instead - ticket["updated_at"] = _parse_date_or_none(ticket["updated_at"]) - ticket["created_at"] = _parse_date_or_none(ticket["created_at"]) - ticket["due_at"] = _parse_date_or_none(ticket["due_at"]) - return ticket - - -def process_ticket_field(field: DictStrAny, custom_fields_state: dict[str, TCustomFieldInfo]) -> TDataItem: - """Update custom field mapping in dlt state for the given field.""" - # grab id and update state dict - # if the id is new, add a new key to indicate that this is the initial value for title - # New dropdown options are added to existing field but existing options are not changed - return_dict = field.copy() - field_id = str(field["id"]) - - options = field.get("custom_field_options", []) - new_options = {o["value"]: o["name"] for o in options} - existing_field = custom_fields_state.get(field_id) - if existing_field: - existing_options = existing_field["options"] - if return_options := return_dict.get("custom_field_options"): - for item in return_options: - item["name"] = existing_options.get(item["value"], item["name"]) - for key, value in new_options.items(): - if key not in existing_options: - existing_options[key] = value - else: - custom_fields_state[field_id] = {"title": field["title"], "options": new_options} - return_dict["initial_title"] = field["title"] - return return_dict diff --git a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py deleted file mode 100644 index d056528059530..0000000000000 --- a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py +++ /dev/null @@ -1,51 +0,0 @@ -""" -This module handles how credentials are read in dlt sources -""" - -from typing import ClassVar, Union -import dlt -from dlt.common.configuration import configspec -from dlt.common.configuration.specs import CredentialsConfiguration -from dlt.common.typing import TSecretValue - - -@configspec -class ZendeskCredentialsBase(CredentialsConfiguration): - """ - The Base version of all the ZendeskCredential classes. - """ - - subdomain: str - __config_gen_annotations__: ClassVar[list[str]] = [] - - -@configspec -class ZendeskCredentialsEmailPass(ZendeskCredentialsBase): - """ - This class is used to store credentials for Email + Password Authentication - """ - - email: str = "" - password: TSecretValue = dlt.secrets.value - - -@configspec -class ZendeskCredentialsOAuth(ZendeskCredentialsBase): - """ - This class is used to store credentials for OAuth Token Authentication - """ - - oauth_token: TSecretValue = dlt.secrets.value - - -@configspec -class ZendeskCredentialsToken(ZendeskCredentialsBase): - """ - This class is used to store credentials for Token Authentication - """ - - email: str = "" - token: TSecretValue = dlt.secrets.value - - -TZendeskCredentials = Union[ZendeskCredentialsEmailPass, ZendeskCredentialsToken, ZendeskCredentialsOAuth] diff --git a/posthog/temporal/data_imports/pipelines/zendesk/helpers.py b/posthog/temporal/data_imports/pipelines/zendesk/helpers.py deleted file mode 100644 index c29f41279a06b..0000000000000 --- a/posthog/temporal/data_imports/pipelines/zendesk/helpers.py +++ /dev/null @@ -1,451 +0,0 @@ -from typing import Optional -from collections.abc import Iterator, Iterable -from itertools import chain - -import dlt -from dlt.common import pendulum -from dlt.common.time import ensure_pendulum_datetime -from dlt.common.typing import TDataItem, TAnyDateTime, TDataItems -from dlt.sources import DltResource - -from posthog.temporal.common.logger import bind_temporal_worker_logger - - -from .api_helpers import process_ticket, process_ticket_field -from .talk_api import PaginationType, ZendeskAPIClient -from .credentials import TZendeskCredentials, ZendeskCredentialsOAuth - -from .settings import ( - DEFAULT_START_DATE, - CUSTOM_FIELDS_STATE_KEY, - SUPPORT_ENDPOINTS, - TALK_ENDPOINTS, - INCREMENTAL_TALK_ENDPOINTS, - SUPPORT_EXTRA_ENDPOINTS, -) - - -@dlt.source(max_table_nesting=0) -def zendesk_talk( - credentials: TZendeskCredentials = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = DEFAULT_START_DATE, - end_date: Optional[TAnyDateTime] = None, -) -> Iterable[DltResource]: - """ - Retrieves data from Zendesk Talk for phone calls and voicemails. - - `start_date` argument can be used on its own or together with `end_date`. When both are provided - data is limited to items updated in that time range. - The range is "half-open", meaning elements equal and higher than `start_date` and elements lower than `end_date` are included. - All resources opt-in to use Airflow scheduler if run as Airflow task - - Args: - credentials: The credentials for authentication. Defaults to the value in the `dlt.secrets` object. - start_date: The start time of the range for which to load. Defaults to January 1st 2000. - end_date: The end time of the range for which to load data. - If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved - Yields: - DltResource: Data resources from Zendesk Talk. - """ - - # use the credentials to authenticate with the ZendeskClient - zendesk_client = ZendeskAPIClient(credentials) - start_date_obj = ensure_pendulum_datetime(start_date) - end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None - - # regular endpoints - for key, talk_endpoint, item_name, cursor_paginated in TALK_ENDPOINTS: - yield dlt.resource( - talk_resource( - zendesk_client, - key, - item_name or talk_endpoint, - PaginationType.CURSOR if cursor_paginated else PaginationType.OFFSET, - ), - name=key, - write_disposition="replace", - ) - - # adding incremental endpoints - for key, talk_incremental_endpoint in INCREMENTAL_TALK_ENDPOINTS.items(): - yield dlt.resource( - talk_incremental_resource, - name=f"{key}_incremental", - primary_key="id", - write_disposition="merge", - )( - zendesk_client=zendesk_client, - talk_endpoint_name=key, - talk_endpoint=talk_incremental_endpoint, - updated_at=dlt.sources.incremental[str]( - "updated_at", - initial_value=start_date_obj.isoformat(), - end_value=end_date_obj.isoformat() if end_date_obj else None, - allow_external_schedulers=True, - ), - ) - - -def talk_resource( - zendesk_client: ZendeskAPIClient, - talk_endpoint_name: str, - talk_endpoint: str, - pagination_type: PaginationType, -) -> Iterator[TDataItem]: - """ - Loads data from a Zendesk Talk endpoint. - - Args: - zendesk_client: An instance of ZendeskAPIClient for making API calls to Zendesk Talk. - talk_endpoint_name: The name of the talk_endpoint. - talk_endpoint: The actual URL ending of the endpoint. - pagination: Type of pagination type used by endpoint - - Yields: - TDataItem: Dictionary containing the data from the endpoint. - """ - # send query and process it - yield from zendesk_client.get_pages(talk_endpoint, talk_endpoint_name, pagination_type) - - -def talk_incremental_resource( - zendesk_client: ZendeskAPIClient, - talk_endpoint_name: str, - talk_endpoint: str, - updated_at: dlt.sources.incremental[str], -) -> Iterator[TDataItem]: - """ - Loads data from a Zendesk Talk endpoint with incremental loading. - - Args: - zendesk_client: An instance of ZendeskAPIClient for making API calls to Zendesk Talk. - talk_endpoint_name: The name of the talk_endpoint. - talk_endpoint: The actual URL ending of the endpoint. - updated_at: Source for the last updated timestamp. - - Yields: - TDataItem: Dictionary containing the data from the endpoint. - """ - # send the request and process it - for page in zendesk_client.get_pages( - talk_endpoint, - talk_endpoint_name, - PaginationType.START_TIME, - params={"start_time": ensure_pendulum_datetime(updated_at.last_value).int_timestamp}, - ): - yield page - if updated_at.end_out_of_range: - return - - -@dlt.source(max_table_nesting=0) -def zendesk_chat( - credentials: ZendeskCredentialsOAuth = dlt.secrets.value, - start_date: Optional[TAnyDateTime] = DEFAULT_START_DATE, - end_date: Optional[TAnyDateTime] = None, -) -> Iterable[DltResource]: - """ - Retrieves data from Zendesk Chat for chat interactions. - - `start_date` argument can be used on its own or together with `end_date`. When both are provided - data is limited to items updated in that time range. - The range is "half-open", meaning elements equal and higher than `start_date` and elements lower than `end_date` are included. - All resources opt-in to use Airflow scheduler if run as Airflow task - - Args: - credentials: The credentials for authentication. Defaults to the value in the `dlt.secrets` object. - start_date: The start time of the range for which to load. Defaults to January 1st 2000. - end_date: The end time of the range for which to load data. - If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved - - Yields: - DltResource: Data resources from Zendesk Chat. - """ - - # Authenticate - zendesk_client = ZendeskAPIClient(credentials, url_prefix="https://www.zopim.com") - start_date_obj = ensure_pendulum_datetime(start_date) - end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None - - yield dlt.resource(chats_table_resource, name="chats", write_disposition="merge")( - zendesk_client, - dlt.sources.incremental[str]( - "update_timestamp|updated_timestamp", - initial_value=start_date_obj.isoformat(), - end_value=end_date_obj.isoformat() if end_date_obj else None, - allow_external_schedulers=True, - ), - ) - - -def chats_table_resource( - zendesk_client: ZendeskAPIClient, - update_timestamp: dlt.sources.incremental[str], -) -> Iterator[TDataItems]: - """ - Resource for Chats - - Args: - zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API. - update_timestamp: Incremental source specifying the timestamp for incremental loading. - - Yields: - dict: A dictionary representing each row of data. - """ - chat_pages = zendesk_client.get_pages( - "/api/v2/incremental/chats", - "chats", - PaginationType.START_TIME, - params={ - "start_time": ensure_pendulum_datetime(update_timestamp.last_value).int_timestamp, - "fields": "chats(*)", - }, - ) - for page in chat_pages: - yield page - - if update_timestamp.end_out_of_range: - return - - -@dlt.source(max_table_nesting=0) -def zendesk_support( - team_id: int, - credentials: TZendeskCredentials = dlt.secrets.value, - endpoints: tuple[str, ...] = (), - pivot_ticket_fields: bool = True, - start_date: Optional[TAnyDateTime] = DEFAULT_START_DATE, - end_date: Optional[TAnyDateTime] = None, -) -> Iterable[DltResource]: - """ - Retrieves data from Zendesk Support for tickets, users, brands, organizations, and groups. - - `start_date` argument can be used on its own or together with `end_date`. When both are provided - data is limited to items updated in that time range. - The range is "half-open", meaning elements equal and higher than `start_date` and elements lower than `end_date` are included. - All resources opt-in to use Airflow scheduler if run as Airflow task - - Args: - credentials: The credentials for authentication. Defaults to the value in the `dlt.secrets` object. - load_all: Whether to load extra resources for the API. Defaults to True. - pivot_ticket_fields: Whether to pivot the custom fields in tickets. Defaults to True. - start_date: The start time of the range for which to load. Defaults to January 1st 2000. - end_date: The end time of the range for which to load data. - If end time is not provided, the incremental loading will be enabled and after initial run, only new data will be retrieved - - Returns: - Sequence[DltResource]: Multiple dlt resources. - """ - - start_date_obj = ensure_pendulum_datetime(start_date) - end_date_obj = ensure_pendulum_datetime(end_date) if end_date else None - - start_date_ts = start_date_obj.int_timestamp - start_date_iso_str = start_date_obj.isoformat() - end_date_ts: Optional[int] = None - end_date_iso_str: Optional[str] = None - if end_date_obj: - end_date_ts = end_date_obj.int_timestamp - end_date_iso_str = end_date_obj.isoformat() - - @dlt.resource(name="ticket_events", primary_key="id", write_disposition="append") - async def ticket_events( - zendesk_client: ZendeskAPIClient, - timestamp: dlt.sources.incremental[int] = dlt.sources.incremental( # noqa: B008 - "timestamp", - initial_value=start_date_ts, - end_value=end_date_ts, - allow_external_schedulers=True, - ), - ): - # URL For ticket events - # 'https://d3v-dlthub.zendesk.com/api/v2/incremental/ticket_events.json?start_time=946684800' - logger = await bind_temporal_worker_logger(team_id) - logger.info("Zendesk: getting ticket_events") - - event_pages = zendesk_client.get_pages( - "/api/v2/incremental/ticket_events.json", - "ticket_events", - PaginationType.STREAM, - params={"start_time": timestamp.last_value}, - ) - for page in event_pages: - yield page - if timestamp.end_out_of_range: - return - - @dlt.resource( - name="tickets", - primary_key="id", - write_disposition="merge", - columns={ - "tags": {"data_type": "complex"}, - "custom_fields": {"data_type": "complex"}, - }, - ) - async def ticket_table( - zendesk_client: ZendeskAPIClient, - pivot_fields: bool = True, - updated_at: dlt.sources.incremental[pendulum.DateTime] = dlt.sources.incremental( # noqa: B008 - "updated_at", - initial_value=start_date_obj, - end_value=end_date_obj, - allow_external_schedulers=True, - ), - ): - """ - Resource for tickets table. Uses DLT state to handle column renaming of custom fields to prevent changing the names of said columns. - This resource uses pagination, loading and side loading to make API calls more efficient. - - Args: - zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API. - pivot_fields: Indicates whether to pivot the custom fields in tickets. Defaults to True. - per_page: The number of Ticket objects to load per page. Defaults to 1000. - updated_at: Incremental source for the 'updated_at' column. - Defaults to dlt.sources.incremental("updated_at", initial_value=start_date). - - Yields: - TDataItem: Dictionary containing the ticket data. - """ - logger = await bind_temporal_worker_logger(team_id) - logger.info("Zendesk: getting tickets") - - fields_dict = dlt.current.source_state().setdefault(CUSTOM_FIELDS_STATE_KEY, {}) - # include_objects = ["users", "groups", "organisation", "brands"] - ticket_pages = zendesk_client.get_pages( - "/api/v2/incremental/tickets", - "tickets", - PaginationType.STREAM, - params={"start_time": updated_at.last_value.int_timestamp}, - ) - for page in ticket_pages: - yield [process_ticket(ticket, fields_dict, pivot_custom_fields=pivot_fields) for ticket in page] - - # stop loading when using end_value and end is reached - if updated_at.end_out_of_range: - return - - @dlt.resource(name="ticket_metric_events", primary_key="id", write_disposition="append") - async def ticket_metric_table( - zendesk_client: ZendeskAPIClient, - time: dlt.sources.incremental[str] = dlt.sources.incremental( # noqa: B008 - "time", - initial_value=start_date_iso_str, - end_value=end_date_iso_str, - allow_external_schedulers=True, - ), - ): - """ - Resource for ticket metric events table. Returns all the ticket metric events from the starting date, - with the default starting date being January 1st of the current year. - - Args: - zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API. - time: Incremental source for the 'time' column, - indicating the starting date for retrieving ticket metric events. - Defaults to dlt.sources.incremental("time", initial_value=start_date_iso_str). - - Yields: - TDataItem: Dictionary containing the ticket metric event data. - """ - logger = await bind_temporal_worker_logger(team_id) - logger.info("Zendesk: getting ticket_metric_events") - - # "https://example.zendesk.com/api/v2/incremental/ticket_metric_events?start_time=1332034771" - metric_event_pages = zendesk_client.get_pages( - "/api/v2/incremental/ticket_metric_events", - "ticket_metric_events", - PaginationType.CURSOR, - params={ - "start_time": ensure_pendulum_datetime(time.last_value).int_timestamp, - }, - ) - for page in metric_event_pages: - yield page - - if time.end_out_of_range: - return - - async def ticket_fields_table(zendesk_client: ZendeskAPIClient): - """ - Loads ticket fields data from Zendesk API. - - Args: - zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API. - - Yields: - TDataItem: Dictionary containing the ticket fields data. - """ - logger = await bind_temporal_worker_logger(team_id) - logger.info("Zendesk: getting ticket_fields") - - # get dlt state - ticket_custom_fields = dlt.current.source_state().setdefault(CUSTOM_FIELDS_STATE_KEY, {}) - # get all custom fields and update state if needed, otherwise just load dicts into tables - all_fields = list( - chain.from_iterable( - zendesk_client.get_pages("/api/v2/ticket_fields.json", "ticket_fields", PaginationType.OFFSET) - ) - ) - # all_fields = zendesk_client.ticket_fields() - for field in all_fields: - yield process_ticket_field(field, ticket_custom_fields) - - ticket_fields_resource = dlt.resource(name="ticket_fields", write_disposition="replace")(ticket_fields_table) - - # Authenticate - zendesk_client = ZendeskAPIClient(credentials) - - all_endpoints = SUPPORT_ENDPOINTS + SUPPORT_EXTRA_ENDPOINTS - - for endpoint in endpoints: - # loading base tables - if endpoint == "ticket_fields": - yield ticket_fields_resource(zendesk_client=zendesk_client) - elif endpoint == "ticket_events": - yield ticket_events(zendesk_client=zendesk_client) - elif endpoint == "tickets": - yield ticket_table(zendesk_client=zendesk_client, pivot_fields=pivot_ticket_fields) - elif endpoint == "ticket_metric_events": - yield ticket_metric_table(zendesk_client=zendesk_client) - else: - # other tables to be loaded - for resource, endpoint_url, data_key, cursor_paginated in all_endpoints: - if endpoint == resource: - yield dlt.resource( - basic_resource, - name=resource, - write_disposition="replace", - )(zendesk_client, endpoint_url, data_key or resource, cursor_paginated, team_id) - - break - - -async def basic_resource( - zendesk_client: ZendeskAPIClient, - endpoint_url: str, - data_key: str, - cursor_paginated: bool, - team_id: int, -): - """ - Basic loader for most endpoints offered by Zenpy. Supports pagination. Expects to be called as a DLT Resource. - - Args: - zendesk_client: The Zendesk API client instance, used to make calls to Zendesk API. - resource: The Zenpy endpoint to retrieve data from, usually directly linked to a Zendesk API endpoint. - cursor_paginated: Tells to use CURSOR pagination or OFFSET/no pagination - - Yields: - TDataItem: Dictionary containing the resource data. - """ - - logger = await bind_temporal_worker_logger(team_id) - logger.info(f"Zendesk: getting {endpoint_url}") - - pages = zendesk_client.get_pages( - endpoint_url, - data_key, - PaginationType.CURSOR if cursor_paginated else PaginationType.OFFSET, - ) - yield pages diff --git a/posthog/temporal/data_imports/pipelines/zendesk/settings.py b/posthog/temporal/data_imports/pipelines/zendesk/settings.py index aa44df7c20297..ddd75aaafaf41 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/settings.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/settings.py @@ -11,6 +11,7 @@ # Resources that will always get pulled BASE_ENDPOINTS = ["ticket_fields", "ticket_events", "tickets", "ticket_metric_events"] +INCREMENTAL_ENDPOINTS = ["tickets"] # Tuples of (Resource name, endpoint URL, data_key, supports pagination) # data_key is the key which data list is nested under in responses diff --git a/posthog/temporal/data_imports/pipelines/zendesk/talk_api.py b/posthog/temporal/data_imports/pipelines/zendesk/talk_api.py deleted file mode 100644 index 4ebf375bf7050..0000000000000 --- a/posthog/temporal/data_imports/pipelines/zendesk/talk_api.py +++ /dev/null @@ -1,115 +0,0 @@ -from enum import Enum -from typing import Optional, Any -from collections.abc import Iterator -from dlt.common.typing import DictStrStr, TDataItems, TSecretValue -from dlt.sources.helpers.requests import client - -from . import settings -from .credentials import ( - ZendeskCredentialsEmailPass, - ZendeskCredentialsOAuth, - ZendeskCredentialsToken, - TZendeskCredentials, -) - - -class PaginationType(Enum): - OFFSET = 0 - CURSOR = 1 - STREAM = 2 - START_TIME = 3 - - -class ZendeskAPIClient: - """ - API client used to make requests to Zendesk talk, support and chat API - """ - - subdomain: str = "" - url: str = "" - headers: Optional[DictStrStr] - auth: Optional[tuple[str, TSecretValue]] - - def __init__(self, credentials: TZendeskCredentials, url_prefix: Optional[str] = None) -> None: - """ - Initializer for the API client which is then used to make API calls to the ZendeskAPI - - Args: - credentials: ZendeskCredentials object which contains the necessary credentials to authenticate to ZendeskAPI - """ - # oauth token is the preferred way to authenticate, followed by api token and then email + password combo - # fill headers and auth for every possibility of credentials given, raise error if credentials are of incorrect type - if isinstance(credentials, ZendeskCredentialsOAuth): - self.headers = {"Authorization": f"Bearer {credentials.oauth_token}"} - self.auth = None - elif isinstance(credentials, ZendeskCredentialsToken): - self.headers = None - self.auth = (f"{credentials.email}/token", credentials.token) - elif isinstance(credentials, ZendeskCredentialsEmailPass): - self.auth = (credentials.email, credentials.password) - self.headers = None - else: - raise TypeError( - "Wrong credentials type provided to ZendeskAPIClient. The credentials need to be of type: ZendeskCredentialsOAuth, ZendeskCredentialsToken or ZendeskCredentialsEmailPass" - ) - - # If url_prefix is set it overrides the default API URL (e.g. chat api uses zopim.com domain) - if url_prefix: - self.url = url_prefix - else: - self.subdomain = credentials.subdomain - self.url = f"https://{self.subdomain}.zendesk.com" - - def get_pages( - self, - endpoint: str, - data_point_name: str, - pagination: PaginationType, - params: Optional[dict[str, Any]] = None, - ) -> Iterator[TDataItems]: - """ - Makes a request to a paginated endpoint and returns a generator of data items per page. - - Args: - endpoint: The url to the endpoint, e.g. /api/v2/calls - data_point_name: The key which data items are nested under in the response object (e.g. calls) - params: Optional dict of query params to include in the request - pagination: Type of pagination type used by endpoint - - Returns: - Generator of pages, each page is a list of dict data items - """ - - # update the page size to enable cursor pagination - params = params or {} - if pagination == PaginationType.CURSOR: - params["page[size]"] = settings.PAGE_SIZE - elif pagination == PaginationType.STREAM: - params["per_page"] = settings.INCREMENTAL_PAGE_SIZE - elif pagination == PaginationType.START_TIME: - params["limit"] = settings.INCREMENTAL_PAGE_SIZE - - # make request and keep looping until there is no next page - get_url = f"{self.url}{endpoint}" - while get_url: - response = client.get(get_url, headers=self.headers, auth=self.auth, params=params) - response.raise_for_status() - response_json = response.json() - result = response_json[data_point_name] - yield result - - get_url = None - if pagination == PaginationType.CURSOR: - if response_json["meta"]["has_more"]: - get_url = response_json["links"]["next"] - elif pagination == PaginationType.OFFSET: - get_url = response_json.get("next_page", None) - elif pagination == PaginationType.STREAM: - # See https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#json-format - if not response_json["end_of_stream"]: - get_url = response_json["next_page"] - elif pagination == PaginationType.START_TIME: - if response_json["count"] > 0: - get_url = response_json["next_page"] - - params = {} diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index f622c42bf7e8a..dcd1840622589 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -7,7 +7,6 @@ # TODO: remove dependency from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count -from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs from posthog.warehouse.models import ( @@ -176,22 +175,18 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> tuple[TSchem return await _run(job_inputs=job_inputs, source=source, logger=logger, inputs=inputs, schema=schema) elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: - from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support + from posthog.temporal.data_imports.pipelines.zendesk import zendesk_source - # NOTE: this line errors on CI mypy but not locally. Putting arguments within the function causes the opposite error - credentials = ZendeskCredentialsToken( - token=model.pipeline.job_inputs.get("zendesk_api_key"), + source = zendesk_source( subdomain=model.pipeline.job_inputs.get("zendesk_subdomain"), - email=model.pipeline.job_inputs.get("zendesk_email_address"), + api_key=model.pipeline.job_inputs.get("zendesk_api_key"), + email_address=model.pipeline.job_inputs.get("zendesk_email_address"), + endpoint=schema.name, + team_id=inputs.team_id, + job_id=inputs.run_id, + is_incremental=schema.is_incremental, ) - data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) - # Uncomment to support zendesk chat and talk - # data_chat = zendesk_chat() - # data_talk = zendesk_talk() - - source = data_support - return await _run(job_inputs=job_inputs, source=source, logger=logger, inputs=inputs, schema=schema) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/tests/data_imports/conftest.py b/posthog/temporal/tests/data_imports/conftest.py index 1f43057da6e8a..2460ca45d0aaa 100644 --- a/posthog/temporal/tests/data_imports/conftest.py +++ b/posthog/temporal/tests/data_imports/conftest.py @@ -515,3 +515,384 @@ def stripe_subscription(): } """ ) + + +@pytest.fixture +def zendesk_brands(): + return json.loads( + """ + { + "brands": [ + { + "active": true, + "brand_url": "https://brand1.zendesk.com", + "created_at": "2019-08-06T02:43:39Z", + "default": true, + "has_help_center": true, + "help_center_state": "enabled", + "host_mapping": "brand1.com", + "id": 360002783572, + "is_deleted": false, + "logo": { + "content_type": "image/png", + "content_url": "https://company.zendesk.com/logos/brand1_logo.png", + "file_name": "brand1_logo.png", + "id": 928374, + "mapped_content_url": "https://company.com/logos/brand1_logo.png", + "size": 166144, + "thumbnails": [ + { + "content_type": "image/png", + "content_url": "https://company.zendesk.com/photos/brand1_logo_thumb.png", + "file_name": "brand1_logo_thumb.png", + "id": 928375, + "mapped_content_url": "https://company.com/photos/brand1_logo_thumb.png", + "size": 58298, + "url": "https://company.zendesk.com/api/v2/attachments/928375.json" + }, + { + "content_type": "image/png", + "content_url": "https://company.zendesk.com/photos/brand1_logo_small.png", + "file_name": "brand1_logo_small.png", + "id": 928376, + "mapped_content_url": "https://company.com/photos/brand1_logo_small.png", + "size": 58298, + "url": "https://company.zendesk.com/api/v2/attachments/928376.json" + } + ], + "url": "https://company.zendesk.com/api/v2/attachments/928374.json" + }, + "name": "Brand 1", + "signature_template": "{{agent.signature}}", + "subdomain": "hello-world", + "ticket_form_ids": [ + 360000660811 + ], + "updated_at": "2019-08-06T02:43:40Z", + "url": "https://company.zendesk.com/api/v2/brands/360002783572.json" + } + ], + "count": 1, + "next_page": null, + "previous_page": null + } + """ + ) + + +@pytest.fixture +def zendesk_organizations(): + return json.loads( + """ + { + "count": 1, + "next_page": null, + "organizations": [ + { + "created_at": "2018-11-14T00:14:52Z", + "details": "caterpillar =)", + "domain_names": [ + "remain.com" + ], + "external_id": "ABC198", + "group_id": 1835962, + "id": 4112492, + "name": "Groablet Enterprises", + "notes": "donkey", + "organization_fields": { + "datepudding": "2018-11-04T00:00:00+00:00", + "org_field_1": "happy happy", + "org_field_2": "teapot_kettle" + }, + "shared_comments": false, + "shared_tickets": false, + "tags": [ + "smiley", + "teapot_kettle" + ], + "updated_at": "2018-11-14T00:54:22Z", + "url": "https://example.zendesk.com/api/v2/organizations/4112492.json" + } + ], + "previous_page": null + } + """ + ) + + +@pytest.fixture +def zendesk_groups(): + return json.loads( + """ + { + "groups": [ + { + "id": 211, + "url": "https://test.zendesk.com/api/v2/groups/211.json", + "name": "DJs", + "description": "Peeps who DJ", + "default": false, + "is_public": true, + "deleted": true, + "created_at": "2009-05-13T00:07:08Z", + "updated_at": "2011-07-22T00:11:12Z" + } + ] + } + """ + ) + + +@pytest.fixture +def zendesk_sla_policies(): + return json.loads( + """ + { + "count": 1, + "next_page": null, + "previous_page": null, + "sla_policies": [ + { + "description": "For urgent incidents, we will respond to tickets in 10 minutes", + "filter": { + "all": [ + { + "field": "type", + "operator": "is", + "value": "incident" + }, + { + "field": "via_id", + "operator": "is", + "value": "4" + } + ], + "any": [] + }, + "id": 36, + "policy_metrics": [ + { + "business_hours": false, + "metric": "first_reply_time", + "priority": "low", + "target": 60 + } + ], + "position": 3, + "title": "Incidents", + "url": "https://{subdomain}.zendesk.com/api/v2/slas/policies/36.json" + } + ] + } + """ + ) + + +@pytest.fixture +def zendesk_users(): + return json.loads( + """ + { + "users": [ + { + "id": 1268829372990, + "url": "https://test.zendesk.com/api/v2/users/1268829372990.json", + "name": "Test", + "email": "test@posthog.com", + "created_at": "2022-04-25T19:42:18Z", + "updated_at": "2024-05-31T22:10:48Z", + "time_zone": "UTC", + "iana_time_zone": "Etc/UTC", + "phone": null, + "shared_phone_number": null, + "photo": null, + "locale_id": 1, + "locale": "en-US", + "organization_id": 1234568, + "role": "end-user", + "verified": true, + "external_id": null, + "tags": [], + "alias": "", + "active": true, + "shared": false, + "shared_agent": false, + "last_login_at": "2024-02-21T04:13:20Z", + "two_factor_auth_enabled": null, + "signature": null, + "details": "", + "notes": "", + "role_type": null, + "custom_role_id": null, + "moderator": false, + "ticket_restriction": "requested", + "only_private_comments": false, + "restricted_agent": true, + "suspended": false, + "default_group_id": null, + "report_csv": false, + "user_fields": { + "anonymize_data": null + } + } + ], + "next_page": null, + "previous_page": null, + "count": 1 + } + """ + ) + + +@pytest.fixture +def zendesk_ticket_fields(): + return json.loads( + """ + { + "ticket_fields": [ + { + "active": true, + "agent_description": "Agent only description", + "collapsed_for_agents": false, + "created_at": "2009-07-20T22:55:29Z", + "description": "This is the subject field of a ticket", + "editable_in_portal": true, + "id": 34, + "position": 21, + "raw_description": "This is the subject field of a ticket", + "raw_title": "{{dc.my_title}}", + "raw_title_in_portal": "{{dc.my_title_in_portal}}", + "regexp_for_validation": null, + "required": true, + "required_in_portal": true, + "tag": null, + "title": "Subject", + "title_in_portal": "Subject", + "type": "subject", + "updated_at": "2011-05-05T10:38:52Z", + "url": "https://company.zendesk.com/api/v2/ticket_fields/34.json", + "visible_in_portal": true + } + ] + } + """ + ) + + +@pytest.fixture +def zendesk_ticket_events(): + return json.loads( + """ + { + "count": 1, + "end_of_stream": true, + "end_time": 1601357503, + "next_page": "https://example.zendesk.com/api/v2/incremental/ticket_events.json?start_time=1601357503", + "ticket_events": [ + { + "id": 926256957613, + "instance_id": 1, + "metric": "agent_work_time", + "ticket_id": 155, + "time": "2020-10-26T12:53:12Z", + "type": "measure" + } + ] + } + """ + ) + + +@pytest.fixture +def zendesk_tickets(): + return json.loads( + """ + { + "count": 1, + "end_of_stream": true, + "end_time": 1390362485, + "next_page": "https://{subdomain}.zendesk.com/api/v2/incremental/tickets.json?per_page=3&start_time=1390362485", + "tickets": [ + { + "assignee_id": 235323, + "collaborator_ids": [ + 35334, + 234 + ], + "created_at": "2009-07-20T22:55:29Z", + "custom_fields": [ + { + "id": 27642, + "value": "745" + }, + { + "id": 27648, + "value": "yes" + } + ], + "description": "The fire is very colorful.", + "due_at": null, + "external_id": "ahg35h3jh", + "follower_ids": [ + 35334, + 234 + ], + "from_messaging_channel": false, + "group_id": 98738, + "has_incidents": false, + "id": 35436, + "organization_id": 509974, + "priority": "high", + "problem_id": 9873764, + "raw_subject": "{{dc.printer_on_fire}}", + "recipient": "support@company.com", + "requester_id": 20978392, + "satisfaction_rating": { + "comment": "Great support!", + "id": 1234, + "score": "good" + }, + "sharing_agreement_ids": [ + 84432 + ], + "status": "open", + "subject": "Help, my printer is on fire!", + "submitter_id": 76872, + "tags": [ + "enterprise", + "other_tag" + ], + "type": "incident", + "updated_at": "2011-05-05T10:38:52Z", + "url": "https://company.zendesk.com/api/v2/tickets/35436.json", + "via": { + "channel": "web" + } + } + ] + } + """ + ) + + +@pytest.fixture +def zendesk_ticket_metric_events(): + return json.loads( + """ + { + "count": 1, + "end_time": 1603716792, + "next_page": "https://company.zendesk.com/api/v2/incremental/ticket_metric_events.json?start_time=1603716792", + "ticket_metric_events": [ + { + "id": 926232157301, + "instance_id": 0, + "metric": "agent_work_time", + "ticket_id": 155, + "time": "2020-10-26T12:53:12Z", + "type": "measure" + } + ] + } + """ + ) diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 2ac44d7443b9a..c0fce812b2196 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -194,3 +194,156 @@ async def test_stripe_subscription(team, stripe_subscription): job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, mock_data_response=stripe_subscription["data"], ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_brands(team, zendesk_brands): + await _run( + team=team, + schema_name="brands", + table_name="zendesk_brands", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_brands["brands"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_organizations(team, zendesk_organizations): + await _run( + team=team, + schema_name="organizations", + table_name="zendesk_organizations", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_organizations["organizations"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_groups(team, zendesk_groups): + await _run( + team=team, + schema_name="groups", + table_name="zendesk_groups", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_groups["groups"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_sla_policies(team, zendesk_sla_policies): + await _run( + team=team, + schema_name="sla_policies", + table_name="zendesk_sla_policies", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_sla_policies["sla_policies"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_users(team, zendesk_users): + await _run( + team=team, + schema_name="users", + table_name="zendesk_users", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_users["users"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_ticket_fields(team, zendesk_ticket_fields): + await _run( + team=team, + schema_name="ticket_fields", + table_name="zendesk_ticket_fields", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_ticket_fields["ticket_fields"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_ticket_events(team, zendesk_ticket_events): + await _run( + team=team, + schema_name="ticket_events", + table_name="zendesk_ticket_events", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_ticket_events["ticket_events"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_tickets(team, zendesk_tickets): + await _run( + team=team, + schema_name="tickets", + table_name="zendesk_tickets", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_tickets["tickets"], + ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_zendesk_ticket_metric_events(team, zendesk_ticket_metric_events): + await _run( + team=team, + schema_name="ticket_metric_events", + table_name="zendesk_ticket_metric_events", + source_type="Zendesk", + job_inputs={ + "zendesk_subdomain": "test", + "zendesk_api_key": "test_api_key", + "zendesk_email_address": "test@posthog.com", + }, + mock_data_response=zendesk_ticket_metric_events["ticket_metric_events"], + )