Skip to content

Commit

Permalink
feat(data-warehouse): DLT Zendesk conversion (#22876)
Browse files Browse the repository at this point in the history
* Added job cancellation checking into rest source

* Remove chunk size

* fixed mypy

* Fixed test

* fixed mypy

* Fixed test

* WIP

* Get the right zendesk tables set as incremental

* Final fixes and tests for zendesk

* Fixed mypy

* Updated incremental table names
  • Loading branch information
Gilbert09 authored Jun 18, 2024
1 parent cfafb9d commit d59313e
Show file tree
Hide file tree
Showing 11 changed files with 829 additions and 743 deletions.
17 changes: 9 additions & 8 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h
posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only
posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type]
posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment]
posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Dict entry 2 has incompatible type "Literal['auto']": "None"; expected "Literal['json_response', 'header_link', 'auto', 'single_page', 'cursor', 'offset', 'page_number']": "type[BasePaginator]" [dict-item]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "AuthConfigBase") [assignment]
posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Argument 1 to "get_auth_class" has incompatible type "Literal['bearer', 'api_key', 'http_basic'] | None"; expected "Literal['bearer', 'api_key', 'http_basic']" [arg-type]
Expand Down Expand Up @@ -550,13 +549,6 @@ posthog/api/test/test_exports.py:0: error: Incompatible types in assignment (exp
posthog/api/notebook.py:0: error: Incompatible types in assignment (expression has type "int", variable has type "str | None") [assignment]
posthog/warehouse/data_load/validate_schema.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment]
posthog/warehouse/api/table.py:0: error: Unsupported target for indexed assignment ("dict[str, str | bool] | str") [index]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr]
posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Not all union combinations were tried because there are too many unions [misc]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 2 to "source" has incompatible type "str | None"; expected "str" [arg-type]
posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 3 to "source" has incompatible type "str | None"; expected "str" [arg-type]
Expand Down Expand Up @@ -668,6 +660,15 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type:
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore]
Expand Down
8 changes: 6 additions & 2 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from posthog.temporal.data_imports.pipelines.zendesk.settings import BASE_ENDPOINTS, SUPPORT_ENDPOINTS
from posthog.temporal.data_imports.pipelines.zendesk.settings import (
BASE_ENDPOINTS,
SUPPORT_ENDPOINTS,
INCREMENTAL_ENDPOINTS as ZENDESK_INCREMENTAL_ENDPOINTS,
)
from posthog.warehouse.models import ExternalDataSource
from posthog.temporal.data_imports.pipelines.stripe.settings import (
ENDPOINTS as STRIPE_ENDPOINTS,
Expand All @@ -19,7 +23,7 @@
PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
ExternalDataSource.Type.STRIPE: STRIPE_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.HUBSPOT: (),
ExternalDataSource.Type.ZENDESK: (),
ExternalDataSource.Type.ZENDESK: ZENDESK_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.POSTGRES: (),
ExternalDataSource.Type.SNOWFLAKE: (),
}
271 changes: 271 additions & 0 deletions posthog/temporal/data_imports/pipelines/zendesk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
import dlt
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request
from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources
from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource
from posthog.warehouse.models.external_table_definitions import get_dlt_mapping_for_external_table


def get_resource(name: str, is_incremental: bool) -> EndpointResource:
resources: dict[str, EndpointResource] = {
"brands": {
"name": "brands",
"table_name": "brands",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_brands"), # type: ignore
"endpoint": {
"data_selector": "brands",
"path": "/api/v2/brands",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
"params": {
"page[size]": 100,
},
},
},
"organizations": {
"name": "organizations",
"table_name": "organizations",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_organizations"), # type: ignore
"endpoint": {
"data_selector": "organizations",
"path": "/api/v2/organizations",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
"params": {
"page[size]": 100,
},
},
},
"groups": {
"name": "groups",
"table_name": "groups",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_groups"), # type: ignore
"endpoint": {
"data_selector": "groups",
"path": "/api/v2/groups",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
"params": {
# the parameters below can optionally be configured
# "exclude_deleted": "OPTIONAL_CONFIG",
"page[size]": 100,
},
},
},
"sla_policies": {
"name": "sla_policies",
"table_name": "sla_policies",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_sla_policies"), # type: ignore
"endpoint": {
"data_selector": "sla_policies",
"path": "/api/v2/slas/policies",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
},
},
"users": {
"name": "users",
"table_name": "users",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_users"), # type: ignore
"endpoint": {
"data_selector": "users",
"path": "/api/v2/users",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
"params": {
# the parameters below can optionally be configured
# "role": "OPTIONAL_CONFIG",
# "role[]": "OPTIONAL_CONFIG",
# "permission_set": "OPTIONAL_CONFIG",
# "external_id": "OPTIONAL_CONFIG",
"page[size]": 100,
},
},
},
"ticket_fields": {
"name": "ticket_fields",
"table_name": "ticket_fields",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_fields"), # type: ignore
"endpoint": {
"data_selector": "ticket_fields",
"path": "/api/v2/ticket_fields",
"paginator": {
"type": "json_response",
"next_url_path": "links.next",
},
"params": {
# the parameters below can optionally be configured
# "locale": "OPTIONAL_CONFIG",
# "creator": "OPTIONAL_CONFIG",
"page[size]": 100,
},
},
},
"ticket_events": {
"name": "ticket_events",
"table_name": "ticket_events",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_events"), # type: ignore
"endpoint": {
"data_selector": "ticket_events",
"path": "/api/v2/incremental/ticket_events?start_time=0",
"paginator": ZendeskIncrementalEndpointPaginator(),
"params": {
"per_page": 1000,
# Having to use `start_time` in the initial path until incrementality works
# "start_time": 0,
# Incrementality is disabled as we can't access end_time on the root object
# "start_time": {
# "type": "incremental",
# "cursor_path": "end_time",
# "initial_value": 0, # type: ignore
# },
},
},
},
"tickets": {
"name": "tickets",
"table_name": "tickets",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_tickets"), # type: ignore
"endpoint": {
"data_selector": "tickets",
"path": "/api/v2/incremental/tickets",
"paginator": ZendeskTicketsIncrementalEndpointPaginator(),
"params": {
"per_page": 1000,
"start_time": {
"type": "incremental",
"cursor_path": "generated_timestamp",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
},
},
},
"ticket_metric_events": {
"name": "ticket_metric_events",
"table_name": "ticket_metric_events",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_metric_events"), # type: ignore
"endpoint": {
"data_selector": "ticket_metric_events",
"path": "/api/v2/incremental/ticket_metric_events?start_time=0",
"paginator": ZendeskIncrementalEndpointPaginator(),
"params": {
"per_page": 1000,
# Having to use `start_time` in the initial path until incrementality works
# "start_time": 0,
# Incrementality is disabled as we can't access end_time on the root object
# "start_time": {
# "type": "incremental",
# "cursor_path": "end_time",
# "initial_value": 0, # type: ignore
# },
},
},
},
}

return resources[name]


class ZendeskTicketsIncrementalEndpointPaginator(BasePaginator):
def update_state(self, response: Response) -> None:
res = response.json()

self._next_start_time = None

if not res:
self._has_next_page = False
return

if not res["end_of_stream"]:
self._has_next_page = True

last_value_in_response = res["tickets"][-1]["generated_timestamp"]
self._next_start_time = last_value_in_response
else:
self._has_next_page = False

def update_request(self, request: Request) -> None:
if request.params is None:
request.params = {}

request.params["start_time"] = self._next_start_time


class ZendeskIncrementalEndpointPaginator(BasePaginator):
def update_state(self, response: Response) -> None:
res = response.json()

self._next_page = None

if not res:
self._has_next_page = False
return

if not res["end_of_stream"]:
self._has_next_page = True

self._next_page = res["next_page"]
else:
self._has_next_page = False

def update_request(self, request: Request) -> None:
request.url = self._next_page


@dlt.source(max_table_nesting=0)
def zendesk_source(
subdomain: str,
api_key: str,
email_address: str,
endpoint: str,
team_id: int,
job_id: str,
is_incremental: bool = False,
):
config: RESTAPIConfig = {
"client": {
"base_url": f"https://{subdomain}.zendesk.com/",
"auth": {
"type": "http_basic",
"username": f"{email_address}/token",
"password": api_key,
},
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
},
"resources": [get_resource(endpoint, is_incremental)],
}

yield from rest_api_resources(config, team_id, job_id)
Loading

0 comments on commit d59313e

Please sign in to comment.