Skip to content

Commit

Permalink
Added vitally Source
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 committed Sep 5, 2024
1 parent 04ef403 commit e648e86
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 3 deletions.
Binary file added frontend/public/services/vitally.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
39 changes: 39 additions & 0 deletions frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,45 @@ export const SOURCE_DETAILS: Record<ExternalDataSourceType, SourceConfig> = {
],
caption: 'Select an existing Salesforce account to link to PostHog or create a new connection',
},
Vitally: {
name: 'Vitally',
fields: [
{
name: 'secret_token',
label: 'Secret token',
type: 'text',
required: true,
placeholder: 'sk_live_...',
},
{
type: 'select',
name: 'region',
label: 'Vitally region',
required: true,
defaultValue: 'EU',
options: [
{
label: 'EU',
value: 'EU',
},
{
label: 'US',
value: 'US',
fields: [
{
name: 'subdomain',
label: 'Vitally subdomain',
type: 'text',
required: true,
placeholder: '',
},
],
},
],
},
],
caption: '',
},
}

export const buildKeaFormDefaultFromSourceDetails = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import IconSalesforce from 'public/services/salesforce.png'
import IconSnowflake from 'public/services/snowflake.png'
import IconMSSQL from 'public/services/sql-azure.png'
import IconStripe from 'public/services/stripe.png'
import IconVitally from 'public/services/vitally.png'
import IconZendesk from 'public/services/zendesk.png'
import { urls } from 'scenes/urls'

Expand Down Expand Up @@ -189,6 +190,7 @@ export function RenderDataWarehouseSourceIcon({
azure: Iconazure,
Salesforce: IconSalesforce,
MSSQL: IconMSSQL,
Vitally: IconVitally,
}[type]

return (
Expand All @@ -203,7 +205,7 @@ export function RenderDataWarehouseSourceIcon({
}
>
<Link to={getDataWarehouseSourceUrl(type)}>
<img src={icon} alt={type} height={sizePx} width={sizePx} className="rounded" />
<img src={icon} alt={type} height={sizePx} width={sizePx} className="rounded object-contain" />
</Link>
</Tooltip>
</div>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3861,6 +3861,7 @@ export const externalDataSources = [
'Zendesk',
'Snowflake',
'Salesforce',
'Vitally',
] as const

export type ExternalDataSourceType = (typeof externalDataSources)[number]
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0465_datawarehouse_stripe_account
posthog: 0466_alter_externaldatasource_source_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
30 changes: 30 additions & 0 deletions posthog/migrations/0466_alter_externaldatasource_source_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.15 on 2024-09-05 10:44

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0465_datawarehouse_stripe_account"),
]

operations = [
migrations.AlterField(
model_name="externaldatasource",
name="source_type",
field=models.CharField(
choices=[
("Stripe", "Stripe"),
("Hubspot", "Hubspot"),
("Postgres", "Postgres"),
("Zendesk", "Zendesk"),
("Snowflake", "Snowflake"),
("Salesforce", "Salesforce"),
("MySQL", "MySQL"),
("MSSQL", "MSSQL"),
("Vitally", "Vitally"),
],
max_length=128,
),
),
]
8 changes: 8 additions & 0 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS,
)
from posthog.temporal.data_imports.pipelines.vitally.settings import (
ENDPOINTS as VITALLY_ENDPOINTS,
INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS,
INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS,
)

PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {
ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS,
Expand All @@ -29,6 +34,7 @@
ExternalDataSource.Type.SALESFORCE: SALESFORCE_ENDPOINTS,
ExternalDataSource.Type.MYSQL: (),
ExternalDataSource.Type.MSSQL: (),
ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS,
}

PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
Expand All @@ -40,6 +46,7 @@
ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.MYSQL: (),
ExternalDataSource.Type.MSSQL: (),
ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS,
}

PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = {
Expand All @@ -51,4 +58,5 @@
ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_FIELDS,
ExternalDataSource.Type.MYSQL: {},
ExternalDataSource.Type.MSSQL: {},
ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS,
}
246 changes: 246 additions & 0 deletions posthog/temporal/data_imports/pipelines/vitally/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import base64
from typing import Any, Optional
import dlt
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request
import requests
from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources
from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource


def get_resource(name: str, is_incremental: bool) -> EndpointResource:
resources: dict[str, EndpointResource] = {
"Organizations": {
"name": "Organizations",
"table_name": "organizations",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/organizations",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Accounts": {
"name": "Accounts",
"table_name": "accounts",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/accounts",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Users": {
"name": "Users",
"table_name": "users",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/users",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Conversations": {
"name": "Conversations",
"table_name": "conversations",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/conversations",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Notes": {
"name": "Notes",
"table_name": "notes",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/notes",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Projects": {
"name": "Projects",
"table_name": "projects",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/projects",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Tasks": {
"name": "Tasks",
"table_name": "tasks",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/tasks",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"NPS_Responses": {
"name": "NPS_Responses",
"table_name": "nps_responses",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/npsResponses",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
"Custom_Objects": {
"name": "Custom_Objects",
"table_name": "custom_objects",
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
"endpoint": {
"data_selector": "results",
"path": "/resources/customObjects",
"params": {"limit": 100, "sortBy": "updatedAt"},
},
"table_format": "delta",
},
}

return resources[name]


class VitallyPaginator(BasePaginator):
def __init__(self) -> None:
super().__init__()

def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None:
res = response.json()

self._cursor = None

if not res:
self._has_next_page = False
return

if res["next"]:
self._has_next_page = True
self._cursor = res["next"]
else:
self._has_next_page = False

def update_request(self, request: Request) -> None:
if request.params is None:
request.params = {}

request.params["from"] = self._cursor


def get_base_url(region: str, subdomain: Optional[str]) -> str:
if region == "US" and subdomain:
return f"https://{subdomain}.rest.vitally.io/"

return "https://rest.vitally-eu.io/"


@dlt.source(max_table_nesting=0)
def vitally_source(
secret_token: str,
region: str,
subdomain: Optional[str],
endpoint: str,
team_id: int,
job_id: str,
is_incremental: bool = False,
):
config: RESTAPIConfig = {
"client": {
"base_url": get_base_url(region, subdomain),
"auth": {
"type": "http_basic",
"username": secret_token,
"password": "",
},
"paginator": VitallyPaginator(),
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": {
"disposition": "merge",
"strategy": "upsert",
}
if is_incremental
else "replace",
},
"resources": [get_resource(endpoint, is_incremental)],
}

yield from rest_api_resources(config, team_id, job_id)


def validate_credentials(secret_token: str, region: str, subdomain: Optional[str]) -> bool:
basic_token = base64.b64encode(f"{secret_token}:".encode("ascii")).decode("ascii")
res = requests.get(
f"{get_base_url(region, subdomain)}resources/users?limit=1",
headers={"Authorization": f"Basic {basic_token}"},
)

return res.status_code == 200
Loading

0 comments on commit e648e86

Please sign in to comment.