diff --git a/frontend/public/services/vitally.png b/frontend/public/services/vitally.png new file mode 100644 index 0000000000000..867ed5e10e908 Binary files /dev/null and b/frontend/public/services/vitally.png differ diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index f385f82c5b402..5f4b9fbc628aa 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -567,6 +567,45 @@ export const SOURCE_DETAILS: Record = { ], caption: 'Select an existing Salesforce account to link to PostHog or create a new connection', }, + Vitally: { + name: 'Vitally', + fields: [ + { + name: 'secret_token', + label: 'Secret token', + type: 'text', + required: true, + placeholder: 'sk_live_...', + }, + { + type: 'select', + name: 'region', + label: 'Vitally region', + required: true, + defaultValue: 'EU', + options: [ + { + label: 'EU', + value: 'EU', + }, + { + label: 'US', + value: 'US', + fields: [ + { + name: 'subdomain', + label: 'Vitally subdomain', + type: 'text', + required: true, + placeholder: '', + }, + ], + }, + ], + }, + ], + caption: '', + }, } export const buildKeaFormDefaultFromSourceDetails = ( diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx index d18657892fd22..4d1fc0f20b4cd 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx @@ -14,6 +14,7 @@ import IconSalesforce from 'public/services/salesforce.png' import IconSnowflake from 'public/services/snowflake.png' import IconMSSQL from 'public/services/sql-azure.png' import IconStripe from 'public/services/stripe.png' +import IconVitally from 'public/services/vitally.png' import IconZendesk from 'public/services/zendesk.png' import { urls } from 'scenes/urls' @@ -189,6 +190,7 @@ export function RenderDataWarehouseSourceIcon({ azure: Iconazure, Salesforce: IconSalesforce, MSSQL: IconMSSQL, + Vitally: IconVitally, }[type] return ( @@ -203,7 +205,7 @@ export function RenderDataWarehouseSourceIcon({ } > - {type} + {type} diff --git a/frontend/src/types.ts b/frontend/src/types.ts index a4e8187d755de..61d6072f17d21 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3861,6 +3861,7 @@ export const externalDataSources = [ 'Zendesk', 'Snowflake', 'Salesforce', + 'Vitally', ] as const export type ExternalDataSourceType = (typeof externalDataSources)[number] diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 85b48ed0ed16f..54f01686c7aee 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0465_datawarehouse_stripe_account +posthog: 0466_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0466_alter_externaldatasource_source_type.py b/posthog/migrations/0466_alter_externaldatasource_source_type.py new file mode 100644 index 0000000000000..4a4b2f522f68b --- /dev/null +++ b/posthog/migrations/0466_alter_externaldatasource_source_type.py @@ -0,0 +1,30 @@ +# Generated by Django 4.2.15 on 2024-09-05 10:44 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0465_datawarehouse_stripe_account"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField( + choices=[ + ("Stripe", "Stripe"), + ("Hubspot", "Hubspot"), + ("Postgres", "Postgres"), + ("Zendesk", "Zendesk"), + ("Snowflake", "Snowflake"), + ("Salesforce", "Salesforce"), + ("MySQL", "MySQL"), + ("MSSQL", "MSSQL"), + ("Vitally", "Vitally"), + ], + max_length=128, + ), + ), + ] diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index 0acd00e8bd6f3..15214f939b78a 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -17,6 +17,11 @@ INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.vitally.settings import ( + ENDPOINTS as VITALLY_ENDPOINTS, + INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS, +) PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, @@ -29,6 +34,7 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_ENDPOINTS, ExternalDataSource.Type.MYSQL: (), ExternalDataSource.Type.MSSQL: (), + ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { @@ -40,6 +46,7 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.MYSQL: (), ExternalDataSource.Type.MSSQL: (), + ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = { @@ -51,4 +58,5 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_FIELDS, ExternalDataSource.Type.MYSQL: {}, ExternalDataSource.Type.MSSQL: {}, + ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS, } diff --git a/posthog/temporal/data_imports/pipelines/vitally/__init__.py b/posthog/temporal/data_imports/pipelines/vitally/__init__.py new file mode 100644 index 0000000000000..4691f12a3d150 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/vitally/__init__.py @@ -0,0 +1,246 @@ +import base64 +from typing import Any, Optional +import dlt +from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.requests import Response, Request +import requests +from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources +from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource + + +def get_resource(name: str, is_incremental: bool) -> EndpointResource: + resources: dict[str, EndpointResource] = { + "Organizations": { + "name": "Organizations", + "table_name": "organizations", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/organizations", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Accounts": { + "name": "Accounts", + "table_name": "accounts", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/accounts", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Users": { + "name": "Users", + "table_name": "users", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/users", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Conversations": { + "name": "Conversations", + "table_name": "conversations", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/conversations", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Notes": { + "name": "Notes", + "table_name": "notes", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/notes", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Projects": { + "name": "Projects", + "table_name": "projects", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/projects", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Tasks": { + "name": "Tasks", + "table_name": "tasks", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/tasks", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "NPS_Responses": { + "name": "NPS_Responses", + "table_name": "nps_responses", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/npsResponses", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + "Custom_Objects": { + "name": "Custom_Objects", + "table_name": "custom_objects", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/customObjects", + "params": {"limit": 100, "sortBy": "updatedAt"}, + }, + "table_format": "delta", + }, + } + + return resources[name] + + +class VitallyPaginator(BasePaginator): + def __init__(self) -> None: + super().__init__() + + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: + res = response.json() + + self._cursor = None + + if not res: + self._has_next_page = False + return + + if res["next"]: + self._has_next_page = True + self._cursor = res["next"] + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params["from"] = self._cursor + + +def get_base_url(region: str, subdomain: Optional[str]) -> str: + if region == "US" and subdomain: + return f"https://{subdomain}.rest.vitally.io/" + + return "https://rest.vitally-eu.io/" + + +@dlt.source(max_table_nesting=0) +def vitally_source( + secret_token: str, + region: str, + subdomain: Optional[str], + endpoint: str, + team_id: int, + job_id: str, + is_incremental: bool = False, +): + config: RESTAPIConfig = { + "client": { + "base_url": get_base_url(region, subdomain), + "auth": { + "type": "http_basic", + "username": secret_token, + "password": "", + }, + "paginator": VitallyPaginator(), + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + }, + "resources": [get_resource(endpoint, is_incremental)], + } + + yield from rest_api_resources(config, team_id, job_id) + + +def validate_credentials(secret_token: str, region: str, subdomain: Optional[str]) -> bool: + basic_token = base64.b64encode(f"{secret_token}:".encode("ascii")).decode("ascii") + res = requests.get( + f"{get_base_url(region, subdomain)}resources/users?limit=1", + headers={"Authorization": f"Basic {basic_token}"}, + ) + + return res.status_code == 200 diff --git a/posthog/temporal/data_imports/pipelines/vitally/settings.py b/posthog/temporal/data_imports/pipelines/vitally/settings.py new file mode 100644 index 0000000000000..fb715ad011032 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/vitally/settings.py @@ -0,0 +1,17 @@ +from posthog.warehouse.types import IncrementalField + +ENDPOINTS = ( + "Organizations", + "Accounts", + "Users", + "Conversations", + "Notes", + "Projects", + "Tasks", + "NPS_Responses", + "Custom_Objects", +) + +INCREMENTAL_ENDPOINTS = () + +INCREMENTAL_FIELDS: dict[str, list[IncrementalField]] = {} diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 6ce4237f53711..73706e1191589 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -283,6 +283,27 @@ async def import_data_activity(inputs: ImportDataActivityInputs): is_incremental=schema.is_incremental, ) + return await _run( + job_inputs=job_inputs, + source=source, + logger=logger, + inputs=inputs, + schema=schema, + reset_pipeline=reset_pipeline, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.VITALLY: + from posthog.temporal.data_imports.pipelines.vitally import vitally_source + + source = vitally_source( + secret_token=model.pipeline.job_inputs.get("secret_token"), + region=model.pipeline.job_inputs.get("region"), + subdomain=model.pipeline.job_inputs.get("subdomain"), + endpoint=schema.name, + team_id=inputs.team_id, + job_id=inputs.run_id, + is_incremental=schema.is_incremental, + ) + return await _run( job_inputs=job_inputs, source=source, diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 2e3f66de9c630..24439fcecdc19 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -24,6 +24,7 @@ from posthog.hogql.database.database import create_hogql_database from posthog.temporal.data_imports.pipelines.stripe import validate_credentials as validate_stripe_credentials from posthog.temporal.data_imports.pipelines.zendesk import validate_credentials as validate_zendesk_credentials +from posthog.temporal.data_imports.pipelines.vitally import validate_credentials as validate_vitally_credentials from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING, PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING, @@ -280,6 +281,8 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: new_source_model = self._handle_zendesk_source(request, *args, **kwargs) elif source_type == ExternalDataSource.Type.SALESFORCE: new_source_model = self._handle_salesforce_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.VITALLY: + new_source_model = self._handle_vitally_source(request, *args, **kwargs) elif source_type in [ ExternalDataSource.Type.POSTGRES, ExternalDataSource.Type.MYSQL, @@ -395,6 +398,28 @@ def _handle_stripe_source(self, request: Request, *args: Any, **kwargs: Any) -> return new_source_model + def _handle_vitally_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + secret_token = payload.get("secret_token") + region = payload.get("region") + subdomain = payload.get("subdomain", None) + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + # TODO: remove dummy vars + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={"secret_token": secret_token, "region": region, "subdomain": subdomain}, + prefix=prefix, + ) + + return new_source_model + def _handle_zendesk_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: payload = request.data["payload"] api_key = payload.get("api_key") @@ -690,6 +715,15 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any): status=status.HTTP_400_BAD_REQUEST, data={"message": "Invalid credentials: Zendesk credentials are incorrect"}, ) + elif source_type == ExternalDataSource.Type.VITALLY: + secret_token = request.data.get("secret_token", "") + region = request.data.get("region", "") + subdomain = request.data.get("subdomain", "") + if not validate_vitally_credentials(subdomain=subdomain, secret_token=secret_token, region=region): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Zendesk credentials are incorrect"}, + ) # Get schemas and validate SQL credentials if source_type in [ diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 83f16eaa9aa1f..a3ba7730aaaa3 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -90,7 +90,9 @@ def aget_schema_if_exists(schema_name: str, team_id: int, source_id: uuid.UUID) @database_sync_to_async def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None: - return ExternalDataSchema.objects.prefetch_related("source").get(id=schema_id, team_id=team_id) + return ( + ExternalDataSchema.objects.prefetch_related("source").exclude(deleted=True).get(id=schema_id, team_id=team_id) + ) @database_sync_to_async diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 6f9fe14e01dd9..14dd7c99dd88c 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -23,6 +23,7 @@ class Type(models.TextChoices): SALESFORCE = "Salesforce", "Salesforce" MYSQL = "MySQL", "MySQL" MSSQL = "MSSQL", "MSSQL" + VITALLY = "Vitally", "Vitally" class Status(models.TextChoices): RUNNING = "Running", "Running"