diff --git a/ee/billing/quota_limiting.py b/ee/billing/quota_limiting.py index f91811d866a40..e5f3d641b5364 100644 --- a/ee/billing/quota_limiting.py +++ b/ee/billing/quota_limiting.py @@ -134,7 +134,7 @@ def org_quota_limited_until( if posthoganalytics.feature_enabled( QUOTA_LIMIT_DATA_RETENTION_FLAG, - organization.id, + str(organization.id), groups={"organization": str(organization.id)}, group_properties={"organization": {"id": str(organization.id)}}, ): diff --git a/ee/billing/test/test_quota_limiting.py b/ee/billing/test/test_quota_limiting.py index 926e3441c4f73..fedf7b15a54ec 100644 --- a/ee/billing/test/test_quota_limiting.py +++ b/ee/billing/test/test_quota_limiting.py @@ -69,9 +69,9 @@ def test_quota_limiting_feature_flag_enabled(self, patch_feature_enabled, patch_ quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas() patch_feature_enabled.assert_called_with( QUOTA_LIMIT_DATA_RETENTION_FLAG, - self.organization.id, + str(self.organization.id), groups={"organization": org_id}, - group_properties={"organization": {"id": org_id}}, + group_properties={"organization": {"id": str(org_id)}}, ) patch_capture.assert_called_once_with( org_id, @@ -101,7 +101,7 @@ def test_quota_limiting_feature_flag_enabled(self, patch_feature_enabled, patch_ quota_limited_orgs, quota_limiting_suspended_orgs = update_all_org_billing_quotas() patch_feature_enabled.assert_called_with( QUOTA_LIMIT_DATA_RETENTION_FLAG, - self.organization.id, + str(self.organization.id), groups={"organization": org_id}, group_properties={"organization": {"id": org_id}}, ) diff --git a/ee/session_recordings/session_recording_playlist.py b/ee/session_recordings/session_recording_playlist.py index 28d3353c0576f..8947e1c270ee4 100644 --- a/ee/session_recordings/session_recording_playlist.py +++ b/ee/session_recordings/session_recording_playlist.py @@ -258,7 +258,7 @@ def modify_recordings( return response.Response({"success": True}) if request.method == "DELETE": - playlist_item = SessionRecordingPlaylistItem.objects.get(playlist=playlist, recording=session_recording_id) # type: ignore + playlist_item = SessionRecordingPlaylistItem.objects.get(playlist=playlist, recording=session_recording_id) if playlist_item: playlist_item.delete() diff --git a/frontend/public/services/vitally.png b/frontend/public/services/vitally.png new file mode 100644 index 0000000000000..867ed5e10e908 Binary files /dev/null and b/frontend/public/services/vitally.png differ diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index f385f82c5b402..5f4b9fbc628aa 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -567,6 +567,45 @@ export const SOURCE_DETAILS: Record = { ], caption: 'Select an existing Salesforce account to link to PostHog or create a new connection', }, + Vitally: { + name: 'Vitally', + fields: [ + { + name: 'secret_token', + label: 'Secret token', + type: 'text', + required: true, + placeholder: 'sk_live_...', + }, + { + type: 'select', + name: 'region', + label: 'Vitally region', + required: true, + defaultValue: 'EU', + options: [ + { + label: 'EU', + value: 'EU', + }, + { + label: 'US', + value: 'US', + fields: [ + { + name: 'subdomain', + label: 'Vitally subdomain', + type: 'text', + required: true, + placeholder: '', + }, + ], + }, + ], + }, + ], + caption: '', + }, } export const buildKeaFormDefaultFromSourceDetails = ( diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx index d18657892fd22..4d1fc0f20b4cd 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseManagedSourcesTable.tsx @@ -14,6 +14,7 @@ import IconSalesforce from 'public/services/salesforce.png' import IconSnowflake from 'public/services/snowflake.png' import IconMSSQL from 'public/services/sql-azure.png' import IconStripe from 'public/services/stripe.png' +import IconVitally from 'public/services/vitally.png' import IconZendesk from 'public/services/zendesk.png' import { urls } from 'scenes/urls' @@ -189,6 +190,7 @@ export function RenderDataWarehouseSourceIcon({ azure: Iconazure, Salesforce: IconSalesforce, MSSQL: IconMSSQL, + Vitally: IconVitally, }[type] return ( @@ -203,7 +205,7 @@ export function RenderDataWarehouseSourceIcon({ } > - {type} + {type} diff --git a/frontend/src/types.ts b/frontend/src/types.ts index a4e8187d755de..61d6072f17d21 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3861,6 +3861,7 @@ export const externalDataSources = [ 'Zendesk', 'Snowflake', 'Salesforce', + 'Vitally', ] as const export type ExternalDataSourceType = (typeof externalDataSources)[number] diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 85b48ed0ed16f..54f01686c7aee 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0465_datawarehouse_stripe_account +posthog: 0466_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 9e25498b357dd..149978d16ec83 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -42,6 +42,15 @@ posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argume posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/utils.py:0: error: No overload variant of "asdict" matches argument type "type[DataclassInstance]" [call-overload] posthog/utils.py:0: note: Possible overload variants: posthog/utils.py:0: note: def asdict(obj: DataclassInstance) -> dict[str, Any] @@ -103,8 +112,6 @@ posthog/models/filters/base_filter.py:0: error: "HogQLContext" has no attribute posthog/models/team/team.py:0: error: Statement is unreachable [unreachable] posthog/models/team/team.py:0: error: Statement is unreachable [unreachable] posthog/models/hog_functions/hog_function.py:0: error: Argument 1 to "get" of "dict" has incompatible type "str | None"; expected "str" [arg-type] -posthog/models/hog_functions/hog_function.py:0: error: Argument 2 to "get_hog_function_status" has incompatible type "UUID"; expected "str" [arg-type] -posthog/models/hog_functions/hog_function.py:0: error: Argument 2 to "patch_hog_function_status" has incompatible type "UUID"; expected "str" [arg-type] posthog/models/user.py:0: error: Incompatible types in assignment (expression has type "type[User]", base class "BaseManager" defined the type as "type[_T]") [assignment] posthog/models/user.py:0: error: Cannot override class variable (previously declared on base class "AbstractBaseUser") with instance variable [misc] posthog/models/user.py:0: error: Incompatible types in assignment (expression has type "None", base class "AbstractUser" defined the type as "CharField[str | int | Combinable, str]") [assignment] @@ -250,7 +257,6 @@ posthog/hogql/printer.py:0: error: "FieldOrTable" has no attribute "name" [attr posthog/hogql/printer.py:0: error: "FieldOrTable" has no attribute "name" [attr-defined] posthog/hogql/printer.py:0: error: Argument 2 to "_get_materialized_column" of "_Printer" has incompatible type "str | int"; expected "str" [arg-type] posthog/hogql/printer.py:0: error: Argument 1 to "_print_identifier" of "_Printer" has incompatible type "str | None"; expected "str" [arg-type] -posthog/user_permissions.py:0: error: Key expression in dictionary comprehension has incompatible type "UUID"; expected type "int" [misc] posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "Level | None") [return-value] posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "Level | None") [return-value] posthog/user_permissions.py:0: error: Incompatible return value type (got "int", expected "RestrictionLevel") [return-value] @@ -258,7 +264,6 @@ posthog/tasks/update_survey_iteration.py:0: error: Incompatible types in assignm posthog/tasks/update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "save" [union-attr] -posthog/tasks/update_survey_iteration.py:0: error: Incompatible type for "key" of "FeatureFlag" (got "UUID", expected "str | int | Combinable") [misc] posthog/permissions.py:0: error: Argument 2 to "feature_enabled" has incompatible type "str | None"; expected "str" [arg-type] posthog/models/event/util.py:0: error: Incompatible types in assignment (expression has type "str", variable has type "datetime") [assignment] posthog/models/event/util.py:0: error: Module has no attribute "utc" [attr-defined] @@ -279,7 +284,6 @@ posthog/demo/matrix/matrix.py:0: error: Name "timezone.datetime" is not defined posthog/demo/matrix/matrix.py:0: error: Name "timezone.datetime" is not defined [name-defined] posthog/demo/matrix/matrix.py:0: error: Name "timezone.datetime" is not defined [name-defined] posthog/api/shared.py:0: error: Incompatible return value type (got "int | None", expected "Level | None") [return-value] -ee/billing/quota_limiting.py:0: error: Argument 2 to "feature_enabled" has incompatible type "UUID"; expected "str" [arg-type] ee/billing/quota_limiting.py:0: error: Unsupported target for indexed assignment ("object") [index] ee/billing/quota_limiting.py:0: error: "object" has no attribute "get" [attr-defined] ee/billing/quota_limiting.py:0: error: Unsupported target for indexed assignment ("object") [index] @@ -325,7 +329,6 @@ posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has i posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment] posthog/api/utils.py:0: error: Argument 1 to "UUID" has incompatible type "int | str"; expected "str | None" [arg-type] -posthog/api/email_verification.py:0: error: Argument 2 to "feature_enabled" has incompatible type "UUID"; expected "str" [arg-type] posthog/queries/trends/util.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | None"; expected "str" [arg-type] posthog/hogql/property.py:0: error: Incompatible type for lookup 'id': (got "str | int | list[str]", expected "str | int") [misc] posthog/hogql/property.py:0: error: Incompatible type for lookup 'pk': (got "str | float", expected "str | int") [misc] @@ -369,18 +372,12 @@ ee/clickhouse/queries/funnels/funnel_correlation.py:0: error: Statement is unrea posthog/api/insight.py:0: error: Argument 1 to has incompatible type "*tuple[str, ...]"; expected "type[BaseRenderer]" [arg-type] posthog/api/dashboards/dashboard.py:0: error: Argument 1 to "dashboard_queryset" of "DashboardTile" has incompatible type "DashboardTile_RelatedManager"; expected "QuerySet[Any, Any]" [arg-type] posthog/api/person.py:0: error: Argument 1 to has incompatible type "*tuple[str, ...]"; expected "type[BaseRenderer]" [arg-type] -posthog/api/person.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str | None"; expected "str | bytes | bytearray" [arg-type] -posthog/api/person.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] -posthog/api/person.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] -posthog/api/person.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/person.py:0: error: Cannot determine type of "group_properties_filter_group" [has-type] posthog/caching/insight_caching_state.py:0: error: Argument "params" to "execute" of "CursorWrapper" has incompatible type "list[object]"; expected "Sequence[bool | int | float | Decimal | str | <6 more items> | None] | Mapping[str, bool | int | float | Decimal | str | <6 more items> | None] | None" [arg-type] posthog/api/cohort.py:0: error: Incompatible type for lookup 'pk': (got "str | int | list[str]", expected "str | int") [misc] -posthog/api/cohort.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] -posthog/api/cohort.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/cohort.py:0: error: Incompatible type for lookup 'pk': (got "str | int | list[str]", expected "str | int") [misc] posthog/caching/insights_api.py:0: error: Unsupported operand types for >= ("datetime" and "None") [operator] posthog/caching/insights_api.py:0: note: Right operand is of type "datetime | None" @@ -388,8 +385,6 @@ posthog/api/feature_flag.py:0: error: Item "Sequence[Any]" of "Any | Sequence[An posthog/api/feature_flag.py:0: error: Item "None" of "Any | Sequence[Any] | None" has no attribute "filters" [union-attr] posthog/api/feature_flag.py:0: error: Incompatible type for lookup 'pk': (got "str | int | list[str]", expected "str | int") [misc] posthog/api/feature_flag.py:0: error: Argument 2 to "get_all_feature_flags" has incompatible type "str | None"; expected "str" [arg-type] -posthog/api/feature_flag.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] -posthog/api/feature_flag.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined] posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined] @@ -413,8 +408,6 @@ posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "SelectU posthog/hogql_queries/insights/lifecycle_query_runner.py:0: error: Item "None" of "JoinExpr | Any | None" has no attribute "sample" [union-attr] posthog/hogql_queries/insights/funnels/funnels_query_runner.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined] posthog/api/survey.py:0: error: Incompatible types in assignment (expression has type "Any | Sequence[Any] | None", variable has type "Survey | None") [assignment] -posthog/api/survey.py:0: error: Argument "item_id" to "log_activity" has incompatible type "UUID"; expected "int | str | UUIDT | None" [arg-type] -posthog/api/survey.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/api/survey.py:0: error: Item "list[_ErrorFullDetails]" of "_FullDetailDict | list[_ErrorFullDetails] | dict[str, _ErrorFullDetails]" has no attribute "get" [union-attr] posthog/api/survey.py:0: error: Item "object" of "object | Any" has no attribute "__iter__" (not iterable) [union-attr] posthog/hogql_queries/web_analytics/web_overview.py:0: error: Module "django.utils.timezone" does not explicitly export attribute "datetime" [attr-defined] @@ -445,9 +438,6 @@ posthog/test/test_feature_flag_analytics.py:0: error: Item "None" of "Dashboard posthog/test/test_feature_flag_analytics.py:0: error: Item "None" of "Dashboard | None" has no attribute "tiles" [union-attr] posthog/test/test_feature_flag_analytics.py:0: error: Item "None" of "Dashboard | None" has no attribute "tiles" [union-attr] posthog/test/test_feature_flag_analytics.py:0: error: Item "None" of "Dashboard | None" has no attribute "delete" [union-attr] -posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] -posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] -posthog/test/activity_logging/test_activity_logging.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/zendesk/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] @@ -607,7 +597,6 @@ posthog/api/organization_feature_flag.py:0: error: Invalid index type "str | Non posthog/api/organization_feature_flag.py:0: error: Invalid index type "str | None" for "dict[str, int]"; expected type "str" [index] posthog/api/organization_feature_flag.py:0: error: Invalid index type "str | None" for "dict[str, int]"; expected type "str" [index] posthog/api/notebook.py:0: error: Incompatible types in assignment (expression has type "int", variable has type "str | None") [assignment] -posthog/api/exports.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/warehouse/data_load/validate_schema.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "DataWarehouseCredential | Combinable | None") [assignment] posthog/warehouse/data_load/validate_schema.py:0: error: Incompatible types in assignment (expression has type "object", variable has type "str | int | Combinable") [assignment] posthog/warehouse/data_load/validate_schema.py:0: error: Incompatible types in assignment (expression has type "dict[str, dict[str, str | bool]] | dict[str, str]", variable has type "dict[str, dict[str, str]]") [assignment] @@ -778,9 +767,6 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required] -posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] -posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] -posthog/tasks/test/test_email.py:0: error: Argument 1 to "send_batch_export_run_failure" has incompatible type "UUID"; expected "str" [arg-type] posthog/session_recordings/session_recording_api.py:0: error: Argument "team_id" to "get_realtime_snapshots" has incompatible type "int"; expected "str" [arg-type] posthog/session_recordings/session_recording_api.py:0: error: Value of type variable "SupportsRichComparisonT" of "sorted" cannot be "str | None" [type-var] posthog/session_recordings/session_recording_api.py:0: error: Argument 1 to "get" of "dict" has incompatible type "str | None"; expected "str" [arg-type] @@ -822,10 +808,8 @@ posthog/warehouse/external_data_source/source.py:0: error: Argument 1 to "_creat posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment] posthog/api/sharing.py:0: error: Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable) [union-attr] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "organization" [union-attr] -posthog/api/plugin.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID | Any"; expected "UUIDT | None" [arg-type] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "id" [union-attr] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "organization" [union-attr] -posthog/api/plugin.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID | Any"; expected "UUIDT | None" [arg-type] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "id" [union-attr] posthog/api/plugin.py:0: error: Incompatible types in assignment (expression has type "str | None", variable has type "str | int | Combinable") [assignment] posthog/api/plugin.py:0: error: Incompatible types in assignment (expression has type "str | None", variable has type "str | int | Combinable") [assignment] @@ -835,10 +819,7 @@ posthog/api/plugin.py:0: error: Incompatible type for "file_name" of "PluginAtta posthog/api/plugin.py:0: error: Incompatible type for "file_size" of "PluginAttachment" (got "int | None", expected "float | int | str | Combinable") [misc] posthog/api/plugin.py:0: error: Item "None" of "IO[Any] | None" has no attribute "read" [union-attr] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "organization" [union-attr] -posthog/api/plugin.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID | Any"; expected "UUIDT | None" [arg-type] posthog/api/plugin.py:0: error: Item "None" of "Team | None" has no attribute "id" [union-attr] -posthog/api/plugin.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] -posthog/api/plugin.py:0: error: Argument "organization_id" to "log_activity" has incompatible type "UUID"; expected "UUIDT | None" [arg-type] posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index] posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index] posthog/temporal/tests/external_data/test_external_data_job.py:0: error: Invalid index type "str" for "dict[Type, Sequence[str]]"; expected type "Type" [index] @@ -907,7 +888,6 @@ posthog/api/test/test_capture.py:0: error: Dict entry 0 has incompatible type "s posthog/api/test/batch_exports/test_update.py:0: error: Unsupported target for indexed assignment ("Collection[str]") [index] posthog/api/test/batch_exports/test_update.py:0: error: Unsupported target for indexed assignment ("Collection[str]") [index] posthog/api/test/batch_exports/test_update.py:0: error: Dict entry 1 has incompatible type "str": "dict[str, Collection[str]]"; expected "str": "str" [dict-item] -posthog/api/test/batch_exports/test_update.py:0: error: Argument 3 to "get_batch_export_ok" has incompatible type "UUID"; expected "int" [arg-type] posthog/api/test/batch_exports/test_update.py:0: error: Value of type "BatchExport" is not indexable [index] posthog/api/test/batch_exports/test_update.py:0: error: Value of type "BatchExport" is not indexable [index] posthog/api/test/batch_exports/test_update.py:0: error: Value of type "BatchExport" is not indexable [index] diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 499b030e61564..8c4eec5e11951 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -237,8 +237,6 @@ abstract class CdpConsumerBase { await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, func: async () => { - console.log('Processing invocations results', results.length) - await Promise.all( results.map(async (result) => { // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions @@ -635,12 +633,6 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { }) ) - invocations.forEach((item) => { - if (!item.hogFunction?.id) { - console.error('No hog function id', item) - } - }) - return invocations }, }) diff --git a/posthog/api/email_verification.py b/posthog/api/email_verification.py index e22a298fe44a0..83c12d1dfe1e9 100644 --- a/posthog/api/email_verification.py +++ b/posthog/api/email_verification.py @@ -14,7 +14,7 @@ def is_email_verification_disabled(user: User) -> bool: # using disabled here so that the default state (if no flag exists) is that verification defaults to ON. return user.organization is not None and posthoganalytics.feature_enabled( VERIFICATION_DISABLED_FLAG, - user.organization.id, + str(user.organization.id), groups={"organization": str(user.organization.id)}, group_properties={"organization": {"id": str(user.organization.id)}}, ) diff --git a/posthog/api/test/batch_exports/operations.py b/posthog/api/test/batch_exports/operations.py index 5ac814deab1e2..20f7d2761e2bf 100644 --- a/posthog/api/test/batch_exports/operations.py +++ b/posthog/api/test/batch_exports/operations.py @@ -1,5 +1,6 @@ from django.test.client import Client as TestClient from rest_framework import status +from posthog.models.utils import UUIDT def create_batch_export(client: TestClient, team_id: int, batch_export_data: dict | str): @@ -16,17 +17,17 @@ def create_batch_export_ok(client: TestClient, team_id: int, batch_export_data: return response.json() -def pause_batch_export(client: TestClient, team_id: int, batch_export_id: int): +def pause_batch_export(client: TestClient, team_id: int, batch_export_id: UUIDT): return client.post(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/pause") -def pause_batch_export_ok(client: TestClient, team_id: int, batch_export_id: int): +def pause_batch_export_ok(client: TestClient, team_id: int, batch_export_id: UUIDT): response = pause_batch_export(client, team_id, batch_export_id) assert response.status_code == status.HTTP_200_OK, response.json() return response.json() -def unpause_batch_export(client: TestClient, team_id: int, batch_export_id: int, backfill: bool = False): +def unpause_batch_export(client: TestClient, team_id: int, batch_export_id: UUIDT, backfill: bool = False): return client.post( f"/api/projects/{team_id}/batch_exports/{batch_export_id}/unpause", {"backfill": backfill}, @@ -34,17 +35,17 @@ def unpause_batch_export(client: TestClient, team_id: int, batch_export_id: int, ) -def unpause_batch_export_ok(client: TestClient, team_id: int, batch_export_id: int, backfill: bool = False): +def unpause_batch_export_ok(client: TestClient, team_id: int, batch_export_id: UUIDT, backfill: bool = False): response = unpause_batch_export(client, team_id, batch_export_id, backfill) assert response.status_code == status.HTTP_200_OK, response.json() return response.json() -def get_batch_export(client: TestClient, team_id: int, batch_export_id: int): +def get_batch_export(client: TestClient, team_id: int, batch_export_id: UUIDT): return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}") -def get_batch_export_ok(client: TestClient, team_id: int, batch_export_id: int): +def get_batch_export_ok(client: TestClient, team_id: int, batch_export_id: UUIDT): response = get_batch_export(client, team_id, batch_export_id) assert response.status_code == status.HTTP_200_OK, response.json() return response.json() @@ -63,11 +64,11 @@ def get_batch_export_runs_ok(client: TestClient, team_id: int, batch_export_id: return response.json() -def delete_batch_export(client: TestClient, team_id: int, batch_export_id: int): +def delete_batch_export(client: TestClient, team_id: int, batch_export_id: UUIDT): return client.delete(f"/api/projects/{team_id}/batch_exports/{batch_export_id}") -def delete_batch_export_ok(client: TestClient, team_id: int, batch_export_id: int): +def delete_batch_export_ok(client: TestClient, team_id: int, batch_export_id: UUIDT): response = delete_batch_export(client, team_id, batch_export_id) assert response.status_code == status.HTTP_204_NO_CONTENT, response return response diff --git a/posthog/api/test/test_query.py b/posthog/api/test/test_query.py index 78339bd3f30c2..e5a85099efd08 100644 --- a/posthog/api/test/test_query.py +++ b/posthog/api/test/test_query.py @@ -11,8 +11,10 @@ from posthog.models.utils import UUIDT from posthog.schema import ( CachedEventsQueryResponse, + DataWarehouseNode, EventPropertyFilter, EventsQuery, + FunnelsQuery, HogQLPropertyFilter, HogQLQuery, PersonPropertyFilter, @@ -731,6 +733,39 @@ def test_invalid_query_kind(self): api_response.content, ) + def test_funnel_query_with_data_warehouse_node_temporarily_raises(self): + # As of September 2024, funnels don't support data warehouse tables YET, so we want a helpful error message + api_response = self.client.post( + f"/api/projects/{self.team.id}/query/", + { + "query": FunnelsQuery( + series=[ + DataWarehouseNode( + id="xyz", + table_name="xyz", + id_field="id", + distinct_id_field="customer_email", + timestamp_field="created", + ), + DataWarehouseNode( + id="abc", + table_name="abc", + id_field="id", + distinct_id_field="customer_email", + timestamp_field="timestamp", + ), + ], + ).model_dump() + }, + ) + self.assertEqual(api_response.status_code, 400) + self.assertDictEqual( + api_response.json(), + self.validation_error_response( + "Data warehouse tables are not supported in funnels just yet. For now, please try this funnel without the data warehouse-based step." + ), + ) + def test_missing_query(self): api_response = self.client.post(f"/api/projects/{self.team.id}/query/", {"query": {}}) self.assertEqual(api_response.status_code, 400) diff --git a/posthog/hogql_queries/insights/funnels/base.py b/posthog/hogql_queries/insights/funnels/base.py index 477c205dd968c..84ddf66eafccd 100644 --- a/posthog/hogql_queries/insights/funnels/base.py +++ b/posthog/hogql_queries/insights/funnels/base.py @@ -299,7 +299,9 @@ def _serialize_step( action_id = step.event type = "events" elif isinstance(step, DataWarehouseNode): - raise NotImplementedError("DataWarehouseNode is not supported in funnels") + raise ValidationError( + "Data warehouse tables are not supported in funnels just yet. For now, please try this funnel without the data warehouse-based step." + ) else: action = Action.objects.get(pk=step.id) name = action.name @@ -584,7 +586,9 @@ def _build_step_query( action = Action.objects.get(pk=int(entity.id), team=self.context.team) event_expr = action_to_expr(action) elif isinstance(entity, DataWarehouseNode): - raise NotImplementedError("DataWarehouseNode is not supported in funnels") + raise ValidationError( + "Data warehouse tables are not supported in funnels just yet. For now, please try this funnel without the data warehouse-based step." + ) elif entity.event is None: # all events event_expr = ast.Constant(value=1) diff --git a/posthog/hogql_queries/insights/funnels/funnel_event_query.py b/posthog/hogql_queries/insights/funnels/funnel_event_query.py index 8acb0f7dea87b..c4cb9507534ef 100644 --- a/posthog/hogql_queries/insights/funnels/funnel_event_query.py +++ b/posthog/hogql_queries/insights/funnels/funnel_event_query.py @@ -7,7 +7,13 @@ from posthog.hogql_queries.utils.query_date_range import QueryDateRange from posthog.models.action.action import Action from posthog.models.property.property import PropertyName -from posthog.schema import ActionsNode, EventsNode, FunnelExclusionActionsNode, FunnelExclusionEventsNode +from posthog.schema import ( + ActionsNode, + DataWarehouseNode, + EventsNode, + FunnelExclusionActionsNode, + FunnelExclusionEventsNode, +) from rest_framework.exceptions import ValidationError @@ -143,6 +149,8 @@ def _entity_expr(self, skip_entity_filter: bool) -> ast.Expr | None: events.update(action.get_step_events()) except Action.DoesNotExist: raise ValidationError(f"Action ID {node.id} does not exist!") + elif isinstance(node, DataWarehouseNode): + continue # Data warehouse nodes aren't based on events else: raise ValidationError("Series and exclusions must be compose of action and event nodes") diff --git a/posthog/hogql_queries/insights/utils/entities.py b/posthog/hogql_queries/insights/utils/entities.py index 794ce6170da11..b14653b338035 100644 --- a/posthog/hogql_queries/insights/utils/entities.py +++ b/posthog/hogql_queries/insights/utils/entities.py @@ -1,6 +1,7 @@ from posthog.schema import ( ActionsNode, CohortPropertyFilter, + DataWarehouseNode, EmptyPropertyFilter, EventsNode, FunnelExclusionActionsNode, @@ -9,16 +10,16 @@ ) from posthog.types import AnyPropertyFilter, EntityNode, ExclusionEntityNode from collections import Counter -from rest_framework.exceptions import ValidationError def is_equal_type(a: EntityNode, b: EntityNode | ExclusionEntityNode) -> bool: if isinstance(a, EventsNode): return isinstance(b, EventsNode) or isinstance(b, FunnelExclusionEventsNode) - elif isinstance(a, ActionsNode): + if isinstance(a, ActionsNode): return isinstance(b, ActionsNode) or isinstance(b, FunnelExclusionActionsNode) - else: - raise ValidationError(detail=f"Type comparision for {type(a)} and {type(b)} not implemented.") + if isinstance(a, DataWarehouseNode): + return isinstance(b, DataWarehouseNode) + raise ValueError(detail=f"Type comparison for {type(a)} and {type(b)} not implemented.") def is_equal(a: EntityNode, b: EntityNode | ExclusionEntityNode, compare_properties=True) -> bool: @@ -44,6 +45,14 @@ def is_equal(a: EntityNode, b: EntityNode | ExclusionEntityNode, compare_propert ): return False + # different data source + if ( + isinstance(a, DataWarehouseNode) + and isinstance(b, DataWarehouseNode) + and (a.id != b.id or a.id_field != b.id_field) + ): + return False + # different properties if compare_properties and _sorted_property_reprs(a.properties) != _sorted_property_reprs(b.properties): return False diff --git a/posthog/migrations/0466_alter_externaldatasource_source_type.py b/posthog/migrations/0466_alter_externaldatasource_source_type.py new file mode 100644 index 0000000000000..4a4b2f522f68b --- /dev/null +++ b/posthog/migrations/0466_alter_externaldatasource_source_type.py @@ -0,0 +1,30 @@ +# Generated by Django 4.2.15 on 2024-09-05 10:44 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0465_datawarehouse_stripe_account"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField( + choices=[ + ("Stripe", "Stripe"), + ("Hubspot", "Hubspot"), + ("Postgres", "Postgres"), + ("Zendesk", "Zendesk"), + ("Snowflake", "Snowflake"), + ("Salesforce", "Salesforce"), + ("MySQL", "MySQL"), + ("MSSQL", "MSSQL"), + ("Vitally", "Vitally"), + ], + max_length=128, + ), + ), + ] diff --git a/posthog/models/utils.py b/posthog/models/utils.py index e9498ce32990e..3dc7e83940b9a 100644 --- a/posthog/models/utils.py +++ b/posthog/models/utils.py @@ -164,7 +164,7 @@ class Meta: class UUIDModel(models.Model): """Base Django Model with default autoincremented ID field replaced with UUIDT.""" - id = models.UUIDField(primary_key=True, default=UUIDT, editable=False) + id: models.UUIDField = models.UUIDField(primary_key=True, default=UUIDT, editable=False) class Meta: abstract = True diff --git a/posthog/plugins/plugin_server_api.py b/posthog/plugins/plugin_server_api.py index fd18e49d16ed7..ef6b312ba874c 100644 --- a/posthog/plugins/plugin_server_api.py +++ b/posthog/plugins/plugin_server_api.py @@ -4,6 +4,7 @@ import structlog from posthog.redis import get_client from posthog.settings import CDP_FUNCTION_EXECUTOR_API_URL, PLUGINS_RELOAD_PUBSUB_CHANNEL, PLUGINS_RELOAD_REDIS_URL +from posthog.models.utils import UUIDT logger = structlog.get_logger(__name__) @@ -62,7 +63,7 @@ def populate_plugin_capabilities_on_workers(plugin_id: str): def create_hog_invocation_test( team_id: int, - hog_function_id: str, + hog_function_id: UUIDT, globals: dict, configuration: dict, mock_async_functions: bool, @@ -78,13 +79,13 @@ def create_hog_invocation_test( ) -def get_hog_function_status(team_id: int, hog_function_id: str) -> requests.Response: +def get_hog_function_status(team_id: int, hog_function_id: UUIDT) -> requests.Response: return requests.get( CDP_FUNCTION_EXECUTOR_API_URL + f"/api/projects/{team_id}/hog_functions/{hog_function_id}/status" ) -def patch_hog_function_status(team_id: int, hog_function_id: str, state: int) -> requests.Response: +def patch_hog_function_status(team_id: int, hog_function_id: UUIDT, state: int) -> requests.Response: return requests.patch( CDP_FUNCTION_EXECUTOR_API_URL + f"/api/projects/{team_id}/hog_functions/{hog_function_id}/status", json={"state": state}, diff --git a/posthog/tasks/email.py b/posthog/tasks/email.py index 925dce44493d1..ee9229109b832 100644 --- a/posthog/tasks/email.py +++ b/posthog/tasks/email.py @@ -20,6 +20,7 @@ Team, User, ) +from posthog.models.utils import UUIDT from posthog.user_permissions import UserPermissions logger = structlog.get_logger(__name__) @@ -159,7 +160,7 @@ def send_fatal_plugin_error( def send_batch_export_run_failure( - batch_export_run_id: str, + batch_export_run_id: UUIDT, ) -> None: logger = structlog.get_logger(__name__) diff --git a/posthog/tasks/stop_surveys_reached_target.py b/posthog/tasks/stop_surveys_reached_target.py index 5432a45d84b19..dc8f99ee3cff0 100644 --- a/posthog/tasks/stop_surveys_reached_target.py +++ b/posthog/tasks/stop_surveys_reached_target.py @@ -1,16 +1,16 @@ from itertools import groupby from django.db.models import Q from django.utils import timezone -from uuid import UUID from datetime import datetime from posthog.clickhouse.client.connection import Workload from posthog.client import sync_execute from posthog.models import Survey +from posthog.models.utils import UUIDT def _get_surveys_response_counts( - surveys_ids: list[UUID], team_id: int, earliest_survey_creation_date: datetime + surveys_ids: list[UUIDT], team_id: int, earliest_survey_creation_date: datetime ) -> dict[str, int]: data = sync_execute( """ diff --git a/posthog/tasks/update_survey_iteration.py b/posthog/tasks/update_survey_iteration.py index 5218a99010252..2c6096b19261b 100644 --- a/posthog/tasks/update_survey_iteration.py +++ b/posthog/tasks/update_survey_iteration.py @@ -58,7 +58,7 @@ def _get_targeting_flag(survey: Survey) -> ForeignKey | ForeignKey | Any: team=survey.team, created_by=survey.created_by, active=True, - key=survey.id, + key=str(survey.id), filters=user_submitted_dismissed_filter, ) new_flag.save() diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index 0acd00e8bd6f3..15214f939b78a 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -17,6 +17,11 @@ INCREMENTAL_ENDPOINTS as SALESFORCE_INCREMENTAL_ENDPOINTS, INCREMENTAL_FIELDS as SALESFORCE_INCREMENTAL_FIELDS, ) +from posthog.temporal.data_imports.pipelines.vitally.settings import ( + ENDPOINTS as VITALLY_ENDPOINTS, + INCREMENTAL_ENDPOINTS as VITALLY_INCREMENTAL_ENDPOINTS, + INCREMENTAL_FIELDS as VITALLY_INCREMENTAL_FIELDS, +) PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, @@ -29,6 +34,7 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_ENDPOINTS, ExternalDataSource.Type.MYSQL: (), ExternalDataSource.Type.MSSQL: (), + ExternalDataSource.Type.VITALLY: VITALLY_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = { @@ -40,6 +46,7 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_ENDPOINTS, ExternalDataSource.Type.MYSQL: (), ExternalDataSource.Type.MSSQL: (), + ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_ENDPOINTS, } PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING: dict[ExternalDataSource.Type, dict[str, list[IncrementalField]]] = { @@ -51,4 +58,5 @@ ExternalDataSource.Type.SALESFORCE: SALESFORCE_INCREMENTAL_FIELDS, ExternalDataSource.Type.MYSQL: {}, ExternalDataSource.Type.MSSQL: {}, + ExternalDataSource.Type.VITALLY: VITALLY_INCREMENTAL_FIELDS, } diff --git a/posthog/temporal/data_imports/pipelines/vitally/__init__.py b/posthog/temporal/data_imports/pipelines/vitally/__init__.py new file mode 100644 index 0000000000000..8c526017db38d --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/vitally/__init__.py @@ -0,0 +1,360 @@ +import base64 +from dateutil import parser +from typing import Any, Optional +import dlt +from dlt.sources.helpers.rest_client.paginators import BasePaginator +from dlt.sources.helpers.requests import Response, Request +import requests +from posthog.temporal.data_imports.pipelines.rest_source import RESTAPIConfig, rest_api_resources +from posthog.temporal.data_imports.pipelines.rest_source.typing import EndpointResource + + +def get_resource(name: str, is_incremental: bool) -> EndpointResource: + resources: dict[str, EndpointResource] = { + "Organizations": { + "name": "Organizations", + "table_name": "organizations", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/organizations", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Accounts": { + "name": "Accounts", + "table_name": "accounts", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/accounts", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Users": { + "name": "Users", + "table_name": "users", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/users", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Conversations": { + "name": "Conversations", + "table_name": "conversations", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/conversations", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Notes": { + "name": "Notes", + "table_name": "notes", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/notes", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Projects": { + "name": "Projects", + "table_name": "projects", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/projects", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Tasks": { + "name": "Tasks", + "table_name": "tasks", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/tasks", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "NPS_Responses": { + "name": "NPS_Responses", + "table_name": "nps_responses", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/npsResponses", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + "Custom_Objects": { + "name": "Custom_Objects", + "table_name": "custom_objects", + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + "endpoint": { + "data_selector": "results", + "path": "/resources/customObjects", + "params": { + "limit": 100, + "sortBy": "updatedAt", + "updatedAt": { + "type": "incremental", + "cursor_path": "updatedAt", + "initial_value": "1970-01-01", # type: ignore + "convert": lambda x: parser.parse(x).timestamp(), + } + if is_incremental + else None, + }, + }, + "table_format": "delta", + }, + } + + return resources[name] + + +class VitallyPaginator(BasePaginator): + def __init__(self) -> None: + super().__init__() + + def update_state(self, response: Response, data: Optional[list[Any]] = None) -> None: + res = response.json() + + current_source = dlt.current.get_source() + resources = current_source.resources + current_resource = next(iter(resources.values())) + incremental = current_resource.incremental.incremental + + self._cursor = None + + if not res: + self._has_next_page = False + return + + if incremental: + updated_at_str = res["results"][0]["updatedAt"] + updated_at = parser.parse(updated_at_str).timestamp() + start_value = parser.parse(incremental.start_value).timestamp() + + if start_value >= updated_at: + self._has_next_page = False + return + + if res["next"]: + self._has_next_page = True + self._cursor = res["next"] + else: + self._has_next_page = False + + def update_request(self, request: Request) -> None: + if request.params is None: + request.params = {} + + request.params["from"] = self._cursor + + +def get_base_url(region: str, subdomain: Optional[str]) -> str: + if region == "US" and subdomain: + return f"https://{subdomain}.rest.vitally.io/" + + return "https://rest.vitally-eu.io/" + + +@dlt.source(max_table_nesting=0) +def vitally_source( + secret_token: str, + region: str, + subdomain: Optional[str], + endpoint: str, + team_id: int, + job_id: str, + is_incremental: bool = False, +): + config: RESTAPIConfig = { + "client": { + "base_url": get_base_url(region, subdomain), + "auth": { + "type": "http_basic", + "username": secret_token, + "password": "", + }, + "paginator": VitallyPaginator(), + }, + "resource_defaults": { + "primary_key": "id", + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", + }, + "resources": [get_resource(endpoint, is_incremental)], + } + + yield from rest_api_resources(config, team_id, job_id) + + +def validate_credentials(secret_token: str, region: str, subdomain: Optional[str]) -> bool: + basic_token = base64.b64encode(f"{secret_token}:".encode("ascii")).decode("ascii") + res = requests.get( + f"{get_base_url(region, subdomain)}resources/users?limit=1", + headers={"Authorization": f"Basic {basic_token}"}, + ) + + return res.status_code == 200 diff --git a/posthog/temporal/data_imports/pipelines/vitally/settings.py b/posthog/temporal/data_imports/pipelines/vitally/settings.py new file mode 100644 index 0000000000000..a16d9565f5d1c --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/vitally/settings.py @@ -0,0 +1,108 @@ +from posthog.warehouse.types import IncrementalField, IncrementalFieldType + +ENDPOINTS = ( + "Organizations", + "Accounts", + "Users", + "Conversations", + "Notes", + "Projects", + "Tasks", + "NPS_Responses", + "Custom_Objects", +) + +INCREMENTAL_ENDPOINTS = ( + "Organizations", + "Accounts", + "Users", + "Conversations", + "Notes", + "Projects", + "Tasks", + "NPS_Responses", + "Custom_Objects", +) + +INCREMENTAL_FIELDS: dict[str, list[IncrementalField]] = { + "Organizations": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Accounts": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Users": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Conversations": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Notes": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Projects": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Tasks": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "NPS_Responses": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Custom_Fields": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], + "Custom_Objects": [ + { + "label": "updated_at", + "type": IncrementalFieldType.DateTime, + "field": "updated_at", + "field_type": IncrementalFieldType.DateTime, + } + ], +} diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 6ce4237f53711..73706e1191589 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -283,6 +283,27 @@ async def import_data_activity(inputs: ImportDataActivityInputs): is_incremental=schema.is_incremental, ) + return await _run( + job_inputs=job_inputs, + source=source, + logger=logger, + inputs=inputs, + schema=schema, + reset_pipeline=reset_pipeline, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.VITALLY: + from posthog.temporal.data_imports.pipelines.vitally import vitally_source + + source = vitally_source( + secret_token=model.pipeline.job_inputs.get("secret_token"), + region=model.pipeline.job_inputs.get("region"), + subdomain=model.pipeline.job_inputs.get("subdomain"), + endpoint=schema.name, + team_id=inputs.team_id, + job_id=inputs.run_id, + is_incremental=schema.is_incremental, + ) + return await _run( job_inputs=job_inputs, source=source, diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 2e3f66de9c630..24439fcecdc19 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -24,6 +24,7 @@ from posthog.hogql.database.database import create_hogql_database from posthog.temporal.data_imports.pipelines.stripe import validate_credentials as validate_stripe_credentials from posthog.temporal.data_imports.pipelines.zendesk import validate_credentials as validate_zendesk_credentials +from posthog.temporal.data_imports.pipelines.vitally import validate_credentials as validate_vitally_credentials from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING, PIPELINE_TYPE_INCREMENTAL_FIELDS_MAPPING, @@ -280,6 +281,8 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: new_source_model = self._handle_zendesk_source(request, *args, **kwargs) elif source_type == ExternalDataSource.Type.SALESFORCE: new_source_model = self._handle_salesforce_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.VITALLY: + new_source_model = self._handle_vitally_source(request, *args, **kwargs) elif source_type in [ ExternalDataSource.Type.POSTGRES, ExternalDataSource.Type.MYSQL, @@ -395,6 +398,28 @@ def _handle_stripe_source(self, request: Request, *args: Any, **kwargs: Any) -> return new_source_model + def _handle_vitally_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + secret_token = payload.get("secret_token") + region = payload.get("region") + subdomain = payload.get("subdomain", None) + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + # TODO: remove dummy vars + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={"secret_token": secret_token, "region": region, "subdomain": subdomain}, + prefix=prefix, + ) + + return new_source_model + def _handle_zendesk_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: payload = request.data["payload"] api_key = payload.get("api_key") @@ -690,6 +715,15 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any): status=status.HTTP_400_BAD_REQUEST, data={"message": "Invalid credentials: Zendesk credentials are incorrect"}, ) + elif source_type == ExternalDataSource.Type.VITALLY: + secret_token = request.data.get("secret_token", "") + region = request.data.get("region", "") + subdomain = request.data.get("subdomain", "") + if not validate_vitally_credentials(subdomain=subdomain, secret_token=secret_token, region=region): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Invalid credentials: Zendesk credentials are incorrect"}, + ) # Get schemas and validate SQL credentials if source_type in [ diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 83f16eaa9aa1f..a3ba7730aaaa3 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -90,7 +90,9 @@ def aget_schema_if_exists(schema_name: str, team_id: int, source_id: uuid.UUID) @database_sync_to_async def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None: - return ExternalDataSchema.objects.prefetch_related("source").get(id=schema_id, team_id=team_id) + return ( + ExternalDataSchema.objects.prefetch_related("source").exclude(deleted=True).get(id=schema_id, team_id=team_id) + ) @database_sync_to_async diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 6f9fe14e01dd9..14dd7c99dd88c 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -23,6 +23,7 @@ class Type(models.TextChoices): SALESFORCE = "Salesforce", "Salesforce" MYSQL = "MySQL", "MySQL" MSSQL = "MSSQL", "MSSQL" + VITALLY = "Vitally", "Vitally" class Status(models.TextChoices): RUNNING = "Running", "Running"