From 1d6ba3c60ba7378ff94f2dff17bb5dde9c0476c7 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Tue, 2 Jan 2024 16:02:15 -0500 Subject: [PATCH] feat(data-warehouse): hubspot integration (#19529) * initial api * urls * refactor source selector frontend * refactor source selector frontend * field config * http working * add hubspot dlt helpers * remove products endpoint and add token refresh * reformat * add limiting * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * typing and migration * typing * update latest migration * add hubspot logo * Update UI snapshots for `chromium` (1) * add prefix flow --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- frontend/public/hubspot-logo.png | Bin 0 -> 4585 bytes frontend/src/lib/api.ts | 6 +- frontend/src/scenes/appScenes.ts | 1 + .../data-warehouse/external/SourceModal.tsx | 118 ++++++----- .../external/sourceFormLogic.ts | 135 ++++++++++++ .../external/sourceModalLogic.ts | 88 ++++---- .../redirect/DataWarehouseRedirectScene.tsx | 34 +++ .../settings/DataWarehouseSettingsScene.tsx | 2 +- frontend/src/scenes/sceneTypes.ts | 1 + frontend/src/scenes/scenes.ts | 4 + frontend/src/scenes/urls.ts | 1 + frontend/src/types.ts | 15 +- latest_migrations.manifest | 2 +- posthog/api/test/test_preflight.py | 1 + ...80_alter_externaldatasource_source_type.py | 17 ++ posthog/settings/__init__.py | 2 +- .../{airbyte.py => data_warehouse.py} | 3 + .../data_imports/external_data_job.py | 19 ++ .../pipelines/hubspot/__init__.py | 146 +++++++++++++ .../data_imports/pipelines/hubspot/auth.py | 42 ++++ .../data_imports/pipelines/hubspot/helpers.py | 198 ++++++++++++++++++ .../pipelines/hubspot/settings.py | 106 ++++++++++ .../data_imports/pipelines/schemas.py | 8 +- posthog/views.py | 2 + posthog/warehouse/api/external_data_source.py | 70 +++++-- .../api/test/test_external_data_source.py | 10 +- .../warehouse/models/external_data_source.py | 1 + 27 files changed, 915 insertions(+), 117 deletions(-) create mode 100644 frontend/public/hubspot-logo.png create mode 100644 frontend/src/scenes/data-warehouse/external/sourceFormLogic.ts create mode 100644 frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx create mode 100644 posthog/migrations/0380_alter_externaldatasource_source_type.py rename posthog/settings/{airbyte.py => data_warehouse.py} (75%) create mode 100644 posthog/temporal/data_imports/pipelines/hubspot/__init__.py create mode 100644 posthog/temporal/data_imports/pipelines/hubspot/auth.py create mode 100644 posthog/temporal/data_imports/pipelines/hubspot/helpers.py create mode 100644 posthog/temporal/data_imports/pipelines/hubspot/settings.py diff --git a/frontend/public/hubspot-logo.png b/frontend/public/hubspot-logo.png new file mode 100644 index 0000000000000000000000000000000000000000..ecb0a3804f05c12b5eb3fd0ff25908572cdc7610 GIT binary patch literal 4585 zcmbVPc|26>|DS1u>{lauj;s}COoQx8vfSb(gveNC)G)?m218^dqL?h9tX*4W&7K&! zkqk+e5*oWCA$wWscj*4Q_kLgBKfbT~$2sTuJkR^{{(R17d7g8kPg$Dsa*1(4AP`=2 zGvm|XvvvF8WC!oX_9dC%14=$^Y6K~HDKQNSdq`#uWC(<>dHaGU9p)1UHCg7yhBnmn zS(ZLQ!gfE$XN1B;l4L)+4nKyC59uI^NpLXXEjH#c#$2RB-{7W(_v4Hrj1}pLccr_# zWYKV@RR(!?@=NnW>Tvs~(GT8O>~nqVWv@Y~)O(-mw6NrIiLp^1IyBF?qcMCFyE_|s z1iPkvWPW&FcV&W^r~qWszfElHib{xqN~2s1qIg6E4#3#2w$lEk7=AAV(R?!AvcdJ$O-hbk|8ZrE~A~ zfyY;~S3}nNSdZLgzcGO=8d|*3PU>w)@yGmAY{_~P8J)S#Nqh;b3N2Qs0o81&xFj#r zfYAZ;T0w9ejCFl6n$0W~t-MIVQ9#T*+oG*;+YkRWK_gepL*3@umktPOm|+Eo9L7C&}4 zIh=TI{A<(YWhwltKw|h8Vt+@V&MWTL>Z!UmMCO3L`DnT1>Ug+|kJ+f6VR=#i!y}@V zQhI~w*B@mgjrAIeo<#QZb#b@u={yV}yI<~{PrTWeVIcky1k=w`J?7_w8CAy-fP${R z;aJrt2lxlpI}Qfzq<*RYhs+|U_Bs2g`7~>TOQu zCq2TfG;AF=Hb3IvzZ9UCnPN9>P%ExjPklD){#`Lv*+EG7gTw*IVvshfAe2t)yI03; zHYY+_2pOs6erXw)d%-m5(V?1POf=ods)qN=yNHo?=+_4wj-_qCR?cQ|=(kY!QwbMa zJbzcjojg`lm}ga4ibj;a3|n$!G0sBwm#hlk>eeB>$9**lCGY(*N@vV4W6jNsKE*#c zGci9C`d#vyz38Fh2@Rm^zMOX$re;;_Wk<^ptNs+5WpZBVldX`NYQA?2;#By}4u-t` z@6mm6-_JKb!;S}zY)&G5fOE1kK$E9BtyEupE~)rjYtp#JnGK%++(c5rIguWg0#{>R zq5pHok7=Xy2L96%8+Ras9=e<|0PWp$=M(PqOV6GAJapDy8d2^DSsJ)a?jGqj^`D~I z5^?U!8GL&X0IlArej%C_##D-F=#h41XL~Dt+lksUq&nWUrgV}j3z6oKRj*K-T=%Xz zku51o;r0XdIQ0SA^5B!LS{Cz?cwslqNBxszqM(8}6grVK(`{Qv{zJ@BFrlaq5vA>0F9=!D2sZN=f>c5*qE)5Djx^Ah+qSC^f|y^zxd6?>)GJzH*1%j=FmD02@AQt_I^R4afbaxM}!=yz0Ntj!Am)9o?hB)3NZ*kl5Z?7G>b+VDR3% zsO_{p#>51-$D7D=v|%uOd4o=oH&zbK*wf)XQa3n+sS**$%1+KI_?-8Z6*AS%P);hg z3p1@NgiC3sJAZGI99h27SAtRR4)N8*cKcemd(s*5-<84R-C(N&!#MW9n|l^;4Pt`ya;7wIAcK?pwNtm4XkBLMFI3TJANz zD_p(FdO7~ym?w(giu-MS=_Pu1eQ;P+`@#$5e&wD4bEnRB7<==zk0C7Eu)M(jwdN#d zIYH`o(S&Q?-)L7uVPnT^7p6zGB+87eG-T6;gG7bF(0ZvaI}7J~Y+aQ(qu93k%UKlc`yUhzWsQcB_xi(n#ij_;`|e?^w4KuUJ<%bC}` zMKu?%lvL(N9nEZRr9H}3n>r+I-Nb)eX^<7H*g|6~T1eb-x9pG)m^x?vfoCIA<#?hFi|Ja5 zoqH}u=mZoW6&f>XkFvYQm%X?$fsV+nFvh#vtZ`9L^?d<#5Q)x@IQnVG!@#MWp_g5j zwArUL*p4+d$>57a0V?=+Ukijbo>$6w_G9E{9&SOA01^YAKchQCz!{< zW0ffy_fAv;| zI5yqGnkiR}sgACa8wIRmnUB9vQQpE1f;NVPlDf8ASKJfi50a&UWk%(5D?! z><*@|n59k&XLS>x3mucr(;Gt9yC={ zIm)NKMxT?Yf`U0vmAEE{mHzywvH~5|Ejrp>gt7TerXqnt00PNljWc+4cU+3b;q!6{T;b@{ZKuG;Wlz$Lq`eVb<>d zS`rCV*K$3-Y|h;louHIxaS`dp(|R|R_fA;dld@?UiP^qolI@wzT4GL2Ok3NN9&ST6 znHJh|c7Gh=Tu8-|Z;z4HWrEGB2wGy+bEYw8elk_938i*ei2q(^e*XCaDV`|v-h{}9 zCZgXh2@yHx>anXStjUUSb`Px0>eNw(gnPyYzg;Ja#1TDbPV;oU)*=*>D0_Wk{d@P} zBhF9SqF}Bcu0p4qS6XXkNT)iFI+RH^8Eznfjn{=d9FmQ+Pxq=yk1Oz)9zp_Tp2xZJ zY8y$p>?!sZ zC$-#tyj5JWK5jS_sy7MzErdXH^r<9QcP|_TaKoL)6Ll5l>z*h8c&x61otg#Gf^-ab z0dE%Mi?a^0v~dsea@WKv=<9LmP_;k;Zyd!Hpn4OCWG$+$!Va$%INqK{D1abetcTWV zW0OB2prosCfkGi^ArJup0V)AURD68TBUCjtH4#V@0)_~PAuvJ=TFt~)b++bSyHOCwydWQ3{; za@#Ec(D{MaGQ^U%#UDnjaTj-l9})q-V~OPIOUBs*l5n~Tr*UK-KVNs;4#T$6pCre8 zajp~`Ru6?lsVXB?l~qwXi2qpHw(*z3aVo_etEcIKc13BrX@V+PRb{j~@~E;a(gUfC z^-x2rp^+X~HS}LZKUDrnW8wy~sG-zQ>S~&*XjKhOwWG+NmUkBaiMR3bbHA|tDtdqM z{x!FQ)j@1Wmw^8flpW#EsQe`U5wah^zoq4WPZjBJRj{3>onqY15V!zl=*XWL0wpa& zKgtCkUp)s`FE1a!!qwBy7Y|tD3H~^|udB{B{{InRnd~_GuN(sF_#cJy(;=!W>2DR literal 0 HcmV?d00001 diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index de94171d3d311..06a914a88a253 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -29,9 +29,9 @@ import { EventType, Experiment, ExportedAssetType, + ExternalDataSourceCreatePayload, ExternalDataSourceSchema, ExternalDataStripeSource, - ExternalDataStripeSourceCreatePayload, FeatureFlagAssociatedRoleType, FeatureFlagType, Group, @@ -1756,9 +1756,7 @@ const api = { async list(): Promise> { return await new ApiRequest().externalDataSources().get() }, - async create( - data: Partial - ): Promise { + async create(data: Partial): Promise { return await new ApiRequest().externalDataSources().create({ data }) }, async delete(sourceId: ExternalDataStripeSource['id']): Promise { diff --git a/frontend/src/scenes/appScenes.ts b/frontend/src/scenes/appScenes.ts index 8c7a8c5ab8c09..021d4b883ae61 100644 --- a/frontend/src/scenes/appScenes.ts +++ b/frontend/src/scenes/appScenes.ts @@ -41,6 +41,7 @@ export const appScenes: Record any> = { [Scene.DataWarehouseExternal]: () => import('./data-warehouse/external/DataWarehouseExternalScene'), [Scene.DataWarehouseSavedQueries]: () => import('./data-warehouse/saved_queries/DataWarehouseSavedQueriesScene'), [Scene.DataWarehouseSettings]: () => import('./data-warehouse/settings/DataWarehouseSettingsScene'), + [Scene.DataWarehouseRedirect]: () => import('./data-warehouse/redirect/DataWarehouseRedirectScene'), [Scene.OrganizationCreateFirst]: () => import('./organization/Create'), [Scene.OrganizationCreationConfirm]: () => import('./organization/ConfirmOrganization/ConfirmOrganization'), [Scene.ProjectHomepage]: () => import('./project-homepage/ProjectHomepage'), diff --git a/frontend/src/scenes/data-warehouse/external/SourceModal.tsx b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx index e09050ca89d18..436fc02002820 100644 --- a/frontend/src/scenes/data-warehouse/external/SourceModal.tsx +++ b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx @@ -1,37 +1,47 @@ -import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps } from '@posthog/lemon-ui' +import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps, Link } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { Form } from 'kea-forms' import { Field } from 'lib/forms/Field' +import hubspotLogo from 'public/hubspot-logo.png' import stripeLogo from 'public/stripe-logo.svg' +import { ExternalDataSourceType } from '~/types' + import { DatawarehouseTableForm } from '../new_table/DataWarehouseTableForm' +import { SOURCE_DETAILS, sourceFormLogic } from './sourceFormLogic' import { ConnectorConfigType, sourceModalLogic } from './sourceModalLogic' interface SourceModalProps extends LemonModalProps {} export default function SourceModal(props: SourceModalProps): JSX.Element { - const { tableLoading, isExternalDataSourceSubmitting, selectedConnector, isManualLinkFormVisible, connectors } = + const { tableLoading, selectedConnector, isManualLinkFormVisible, connectors, addToHubspotButtonUrl } = useValues(sourceModalLogic) - const { selectConnector, toggleManualLinkFormVisible, resetExternalDataSource, resetTable } = - useActions(sourceModalLogic) + const { selectConnector, toggleManualLinkFormVisible, onClear } = useActions(sourceModalLogic) const MenuButton = (config: ConnectorConfigType): JSX.Element => { const onClick = (): void => { selectConnector(config) } - return ( - - {`stripe - - ) - } + if (config.name === 'Stripe') { + return ( + + {`stripe + + ) + } + if (config.name === 'Hubspot') { + return ( + + + {`hubspot + Hubspot + + + ) + } - const onClear = (): void => { - selectConnector(null) - toggleManualLinkFormVisible(false) - resetExternalDataSource() - resetTable() + return <> } const onManualLinkClick = (): void => { @@ -40,39 +50,7 @@ export default function SourceModal(props: SourceModalProps): JSX.Element { const formToShow = (): JSX.Element => { if (selectedConnector) { - return ( -
- - - - - - - - - - -
- - Back - - - Link - -
- - ) + return } if (isManualLinkFormVisible) { @@ -131,3 +109,47 @@ export default function SourceModal(props: SourceModalProps): JSX.Element { ) } + +interface SourceFormProps { + sourceType: ExternalDataSourceType +} + +function SourceForm({ sourceType }: SourceFormProps): JSX.Element { + const logic = sourceFormLogic({ sourceType }) + const { isExternalDataSourceSubmitting } = useValues(logic) + const { onBack } = useActions(logic) + + return ( +
+ + + + {SOURCE_DETAILS[sourceType].fields.map((field) => ( + + + + ))} + +
+ + Back + + + Link + +
+ + ) +} diff --git a/frontend/src/scenes/data-warehouse/external/sourceFormLogic.ts b/frontend/src/scenes/data-warehouse/external/sourceFormLogic.ts new file mode 100644 index 0000000000000..67877f9a32205 --- /dev/null +++ b/frontend/src/scenes/data-warehouse/external/sourceFormLogic.ts @@ -0,0 +1,135 @@ +import { lemonToast } from '@posthog/lemon-ui' +import { actions, connect, kea, listeners, path, props } from 'kea' +import { forms } from 'kea-forms' +import { router, urlToAction } from 'kea-router' +import api from 'lib/api' +import { urls } from 'scenes/urls' + +import { ExternalDataSourceCreatePayload, ExternalDataSourceType } from '~/types' + +import type { sourceFormLogicType } from './sourceFormLogicType' +import { getHubspotRedirectUri, sourceModalLogic } from './sourceModalLogic' + +export interface SourceFormProps { + sourceType: ExternalDataSourceType +} + +interface SourceConfig { + name: string + caption: string + fields: FieldConfig[] +} +interface FieldConfig { + name: string + label: string + type: string + required: boolean +} + +export const SOURCE_DETAILS: Record = { + Stripe: { + name: 'Stripe', + caption: 'Enter your Stripe credentials to link your Stripe to PostHog', + fields: [ + { + name: 'account_id', + label: 'Account ID', + type: 'text', + required: true, + }, + { + name: 'client_secret', + label: 'Client Secret', + type: 'text', + required: true, + }, + ], + }, +} + +const getPayloadDefaults = (sourceType: string): Record => { + switch (sourceType) { + case 'Stripe': + return { + account_id: '', + client_secret: '', + } + default: + return {} + } +} + +const getErrorsDefaults = (sourceType: string): ((args: Record) => Record) => { + switch (sourceType) { + case 'Stripe': + return ({ payload }) => ({ + payload: { + account_id: !payload.account_id && 'Please enter an account id.', + client_secret: !payload.client_secret && 'Please enter a client secret.', + }, + }) + default: + return () => ({}) + } +} + +export const sourceFormLogic = kea([ + path(['scenes', 'data-warehouse', 'external', 'sourceFormLogic']), + props({} as SourceFormProps), + connect({ + actions: [sourceModalLogic, ['onClear', 'toggleSourceModal', 'loadSources']], + }), + actions({ + onBack: true, + handleRedirect: (kind: string, searchParams: any) => ({ kind, searchParams }), + }), + listeners(({ actions }) => ({ + onBack: () => { + actions.resetExternalDataSource() + actions.onClear() + }, + submitExternalDataSourceSuccess: () => { + lemonToast.success('New Data Resource Created') + actions.toggleSourceModal(false) + actions.resetExternalDataSource() + actions.loadSources() + router.actions.push(urls.dataWarehouseSettings()) + }, + submitExternalDataSourceFailure: ({ error }) => { + lemonToast.error(error?.message || 'Something went wrong') + }, + handleRedirect: async ({ kind, searchParams }) => { + switch (kind) { + case 'hubspot': { + actions.setExternalDataSourceValue('payload', { + code: searchParams.code, + redirect_uri: getHubspotRedirectUri(), + }) + actions.setExternalDataSourceValue('source_type', 'Hubspot') + return + } + default: + lemonToast.error(`Something went wrong.`) + } + }, + })), + urlToAction(({ actions }) => ({ + '/data-warehouse/:kind/redirect': ({ kind = '' }, searchParams) => { + actions.handleRedirect(kind, searchParams) + }, + })), + forms(({ props }) => ({ + externalDataSource: { + defaults: { + prefix: '', + source_type: props.sourceType, + payload: getPayloadDefaults(props.sourceType), + } as ExternalDataSourceCreatePayload, + errors: getErrorsDefaults(props.sourceType), + submit: async (payload: ExternalDataSourceCreatePayload) => { + const newResource = await api.externalDataSources.create(payload) + return newResource + }, + }, + })), +]) diff --git a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts index 13e6976b2f988..32ae739a5c4d3 100644 --- a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts +++ b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts @@ -1,19 +1,16 @@ -import { lemonToast } from '@posthog/lemon-ui' import { actions, connect, kea, listeners, path, reducers, selectors } from 'kea' -import { forms } from 'kea-forms' -import { router } from 'kea-router' -import api from 'lib/api' -import { urls } from 'scenes/urls' +import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic' -import { ExternalDataStripeSourceCreatePayload } from '~/types' +import { ExternalDataSourceType } from '~/types' import { dataWarehouseTableLogic } from '../new_table/dataWarehouseTableLogic' import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic' import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic' import type { sourceModalLogicType } from './sourceModalLogicType' +export const getHubspotRedirectUri = (): string => `${window.location.origin}/data-warehouse/hubspot/redirect` export interface ConnectorConfigType { - name: string + name: ExternalDataSourceType fields: string[] caption: string disabledReason: string | null @@ -23,10 +20,16 @@ export interface ConnectorConfigType { export const CONNECTORS: ConnectorConfigType[] = [ { name: 'Stripe', - fields: ['accound_id', 'client_secret'], + fields: ['account_id', 'client_secret'], caption: 'Enter your Stripe credentials to link your Stripe to PostHog', disabledReason: null, }, + { + name: 'Hubspot', + fields: [], + caption: '', + disabledReason: null, + }, ] export const sourceModalLogic = kea([ @@ -34,9 +37,18 @@ export const sourceModalLogic = kea([ actions({ selectConnector: (connector: ConnectorConfigType | null) => ({ connector }), toggleManualLinkFormVisible: (visible: boolean) => ({ visible }), + handleRedirect: (kind: string, searchParams: any) => ({ kind, searchParams }), + onClear: true, }), connect({ - values: [dataWarehouseTableLogic, ['tableLoading'], dataWarehouseSettingsLogic, ['dataWarehouseSources']], + values: [ + dataWarehouseTableLogic, + ['tableLoading'], + dataWarehouseSettingsLogic, + ['dataWarehouseSources'], + preflightLogic, + ['preflight'], + ], actions: [ dataWarehouseSceneLogic, ['toggleSourceModal'], @@ -77,37 +89,39 @@ export const sourceModalLogic = kea([ })) }, ], - }), - forms(() => ({ - externalDataSource: { - defaults: { - account_id: '', - client_secret: '', - prefix: '', - source_type: 'Stripe', - } as ExternalDataStripeSourceCreatePayload, - errors: ({ account_id, client_secret }) => { - return { - account_id: !account_id && 'Please enter an account id.', - client_secret: !client_secret && 'Please enter a client secret.', + addToHubspotButtonUrl: [ + (s) => [s.preflight], + (preflight) => { + return () => { + const clientId = preflight?.data_warehouse_integrations?.hubspot.client_id + + if (!clientId) { + return null + } + + const scopes = [ + 'crm.objects.contacts.read', + 'crm.objects.companies.read', + 'crm.objects.deals.read', + 'tickets', + 'crm.objects.quotes.read', + ] + + const params = new URLSearchParams() + params.set('client_id', clientId) + params.set('redirect_uri', getHubspotRedirectUri()) + params.set('scope', scopes.join(' ')) + + return `https://app.hubspot.com/oauth/authorize?${params.toString()}` } }, - submit: async (payload: ExternalDataStripeSourceCreatePayload) => { - const newResource = await api.externalDataSources.create(payload) - return newResource - }, - }, - })), + ], + }), listeners(({ actions }) => ({ - submitExternalDataSourceSuccess: () => { - lemonToast.success('New Data Resource Created') - actions.toggleSourceModal() - actions.resetExternalDataSource() - actions.loadSources() - router.actions.push(urls.dataWarehouseSettings()) - }, - submitExternalDataSourceFailure: ({ error }) => { - lemonToast.error(error?.message || 'Something went wrong') + onClear: () => { + actions.selectConnector(null) + actions.toggleManualLinkFormVisible(false) + actions.resetTable() }, })), ]) diff --git a/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx b/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx new file mode 100644 index 0000000000000..736132b3b747f --- /dev/null +++ b/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx @@ -0,0 +1,34 @@ +import { LemonButton, LemonInput } from '@posthog/lemon-ui' +import { Form } from 'kea-forms' +import { Field } from 'lib/forms/Field' +import { sourceFormLogic } from 'scenes/data-warehouse/external/sourceFormLogic' +import { SceneExport } from 'scenes/sceneTypes' + +export const scene: SceneExport = { + component: DataWarehouseRedirectScene, + logic: sourceFormLogic, +} + +export function DataWarehouseRedirectScene(): JSX.Element { + return ( +
+

Configure

+

Add a prefix to your tables to avoid conflicts with other data sources

+
+ + + + + Submit + +
+
+ ) +} + +export default DataWarehouseRedirectScene diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx index c0b13232a6cdd..f2b5db2080e45 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx @@ -106,7 +106,7 @@ export function DataWarehouseSettingsScene(): JSX.Element { }, { title: 'Sync Frequency', - key: 'prefix', + key: 'frequency', render: function RenderFrequency() { return 'Every 24 hours' }, diff --git a/frontend/src/scenes/sceneTypes.ts b/frontend/src/scenes/sceneTypes.ts index 4604a64a697da..82c5ba9e1589c 100644 --- a/frontend/src/scenes/sceneTypes.ts +++ b/frontend/src/scenes/sceneTypes.ts @@ -43,6 +43,7 @@ export enum Scene { DataWarehouseSavedQueries = 'DataWarehouseSavedQueries', DataWarehouseTable = 'DataWarehouseTable', DataWarehouseSettings = 'DataWarehouseSettings', + DataWarehouseRedirect = 'DataWarehouseRedirect', OrganizationCreateFirst = 'OrganizationCreate', ProjectHomepage = 'ProjectHomepage', ProjectCreateFirst = 'ProjectCreate', diff --git a/frontend/src/scenes/scenes.ts b/frontend/src/scenes/scenes.ts index 4ebd425f741f0..596135d6fc68a 100644 --- a/frontend/src/scenes/scenes.ts +++ b/frontend/src/scenes/scenes.ts @@ -206,6 +206,9 @@ export const sceneConfigurations: Record = { name: 'Data warehouse settings', defaultDocsPath: '/docs/data-warehouse', }, + [Scene.DataWarehouseRedirect]: { + name: 'Data warehouse redirect', + }, [Scene.DataWarehouseTable]: { projectBased: true, name: 'Data warehouse table', @@ -490,6 +493,7 @@ export const routes: Record = { [urls.dataWarehouseExternal()]: Scene.DataWarehouseExternal, [urls.dataWarehouseSavedQueries()]: Scene.DataWarehouseSavedQueries, [urls.dataWarehouseSettings()]: Scene.DataWarehouseSettings, + [urls.dataWarehouseRedirect(':kind')]: Scene.DataWarehouseRedirect, [urls.featureFlags()]: Scene.FeatureFlags, [urls.featureFlag(':id')]: Scene.FeatureFlag, [urls.annotations()]: Scene.DataManagement, diff --git a/frontend/src/scenes/urls.ts b/frontend/src/scenes/urls.ts index bd5cbe5939aec..53c1250229d13 100644 --- a/frontend/src/scenes/urls.ts +++ b/frontend/src/scenes/urls.ts @@ -127,6 +127,7 @@ export const urls = { dataWarehouseExternal: (): string => '/data-warehouse/external', dataWarehouseSavedQueries: (): string => '/data-warehouse/views', dataWarehouseSettings: (): string => '/data-warehouse/settings', + dataWarehouseRedirect: (kind: string): string => `/data-warehouse/${kind}/redirect`, annotations: (): string => '/data-management/annotations', annotation: (id: AnnotationType['id'] | ':id'): string => `/data-management/annotations/${id}`, projectApps: (tab?: PluginTab): string => `/project/apps${tab ? `?tab=${tab}` : ''}`, diff --git a/frontend/src/types.ts b/frontend/src/types.ts index efdb944d97756..0f4109f6083b3 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -2503,6 +2503,11 @@ export interface PreflightStatus { available: boolean client_id?: string } + data_warehouse_integrations: { + hubspot: { + client_id?: string + } + } /** Whether PostHog is running in DEBUG mode. */ is_debug?: boolean licensed_users_available?: number | null @@ -3331,13 +3336,13 @@ export interface DataWarehouseViewLink { from_join_key?: string } -export interface ExternalDataStripeSourceCreatePayload { - account_id: string - client_secret: string +export type ExternalDataSourceType = 'Stripe' | 'Hubspot' + +export interface ExternalDataSourceCreatePayload { + source_type: ExternalDataSourceType prefix: string - source_type: string + payload: Record } - export interface ExternalDataStripeSource { id: string source_id: string diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 9deccea94b408..132521b9f10a3 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0379_alter_scheduledchange +posthog: 0380_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/test/test_preflight.py b/posthog/api/test/test_preflight.py index 74d5a8b2e490b..6b63ad28f7542 100644 --- a/posthog/api/test/test_preflight.py +++ b/posthog/api/test/test_preflight.py @@ -29,6 +29,7 @@ def preflight_dict(self, options={}): "db": True, "initiated": True, "cloud": False, + "data_warehouse_integrations": {"hubspot": {"client_id": None}}, "demo": False, "clickhouse": True, "kafka": True, diff --git a/posthog/migrations/0380_alter_externaldatasource_source_type.py b/posthog/migrations/0380_alter_externaldatasource_source_type.py new file mode 100644 index 0000000000000..70ba4013f8013 --- /dev/null +++ b/posthog/migrations/0380_alter_externaldatasource_source_type.py @@ -0,0 +1,17 @@ +# Generated by Django 3.2.19 on 2023-12-29 18:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0379_alter_scheduledchange"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField(choices=[("Stripe", "Stripe"), ("Hubspot", "Hubspot")], max_length=128), + ), + ] diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py index 7ee845e134694..3a3225e7deed5 100644 --- a/posthog/settings/__init__.py +++ b/posthog/settings/__init__.py @@ -39,7 +39,7 @@ from posthog.settings.object_storage import * from posthog.settings.temporal import * from posthog.settings.web import * -from posthog.settings.airbyte import * +from posthog.settings.data_warehouse import * from posthog.settings.utils import get_from_env, str_to_bool diff --git a/posthog/settings/airbyte.py b/posthog/settings/data_warehouse.py similarity index 75% rename from posthog/settings/airbyte.py rename to posthog/settings/data_warehouse.py index bcbcf2fefacb5..b53e16e570a13 100644 --- a/posthog/settings/airbyte.py +++ b/posthog/settings/data_warehouse.py @@ -8,3 +8,6 @@ # for DLT BUCKET_URL = os.getenv("BUCKET_URL", None) AIRBYTE_BUCKET_NAME = os.getenv("AIRBYTE_BUCKET_NAME", None) + +HUBSPOT_APP_CLIENT_ID = os.getenv("HUBSPOT_APP_CLIENT_ID", None) +HUBSPOT_APP_CLIENT_SECRET = os.getenv("HUBSPOT_APP_CLIENT_SECRET", None) diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index cdb218c0cce31..3f9f0096a7e76 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -155,6 +155,25 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None: source = stripe_source( api_key=stripe_secret_key, endpoints=tuple(inputs.schemas), job_id=str(model.id), team_id=inputs.team_id ) + elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: + from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token + from posthog.temporal.data_imports.pipelines.hubspot import hubspot + + hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) + refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) + if not refresh_token: + raise ValueError(f"Hubspot refresh token not found for job {model.id}") + + if not hubspot_access_code: + hubspot_access_code = refresh_access_token(refresh_token) + + source = hubspot( + api_key=hubspot_access_code, + refresh_token=refresh_token, + job_id=str(model.id), + team_id=inputs.team_id, + endpoints=tuple(inputs.schemas), + ) else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/data_imports/pipelines/hubspot/__init__.py b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py new file mode 100644 index 0000000000000..3275071efe992 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/__init__.py @@ -0,0 +1,146 @@ +""" +This is a module that provides a DLT source to retrieve data from multiple endpoints of the HubSpot API using a specified API key. The retrieved data is returned as a tuple of Dlt resources, one for each endpoint. + +The source retrieves data from the following endpoints: +- CRM Companies +- CRM Contacts +- CRM Deals +- CRM Tickets +- CRM Quotes +- Web Analytics Events + +For each endpoint, a resource and transformer function are defined to retrieve data and transform it to a common format. +The resource functions yield the raw data retrieved from the API, while the transformer functions are used to retrieve +additional information from the Web Analytics Events endpoint. + +The source also supports enabling Web Analytics Events for each endpoint by setting the corresponding enable flag to True. + +Example: +To retrieve data from all endpoints, use the following code: + +python + +>>> resources = hubspot(api_key="hubspot_access_code") +""" + +from typing import Literal, Sequence, Iterator, Iterable + +import dlt +from dlt.common.typing import TDataItems +from dlt.sources import DltResource +from posthog.temporal.data_imports.pipelines.helpers import limit_paginated_generator + +from .helpers import ( + fetch_data, + _get_property_names, + fetch_property_history, +) +from .settings import ( + ALL, + CRM_OBJECT_ENDPOINTS, + DEFAULT_PROPS, + OBJECT_TYPE_PLURAL, + OBJECT_TYPE_SINGULAR, +) + +THubspotObjectType = Literal["company", "contact", "deal", "ticket", "quote"] + + +@dlt.source(name="hubspot") +def hubspot( + api_key: str, + refresh_token: str, + job_id: str, + team_id: int, + endpoints: Sequence[str] = ("companies", "contacts", "deals", "tickets", "quotes"), + include_history: bool = False, +) -> Iterable[DltResource]: + """ + A DLT source that retrieves data from the HubSpot API using the + specified API key. + + This function retrieves data for several HubSpot API endpoints, + including companies, contacts, deals, tickets and web + analytics events. It returns a tuple of Dlt resources, one for + each endpoint. + + Args: + api_key (Optional[str]): + The API key used to authenticate with the HubSpot API. Defaults + to dlt.secrets.value. + include_history (Optional[bool]): + Whether to load history of property changes along with entities. + The history entries are loaded to separate tables. + + Returns: + Sequence[DltResource]: Dlt resources, one for each HubSpot API endpoint. + + Notes: + This function uses the `fetch_data` function to retrieve data from the + HubSpot CRM API. The API key is passed to `fetch_data` as the + `api_key` argument. + """ + + for endpoint in endpoints: + yield dlt.resource( + crm_objects, + name=endpoint, + write_disposition="append", + )( + object_type=OBJECT_TYPE_SINGULAR[endpoint], + api_key=api_key, + refresh_token=refresh_token, + include_history=include_history, + props=DEFAULT_PROPS[endpoint], + include_custom_props=True, + job_id=job_id, + team_id=team_id, + ) + + +@limit_paginated_generator +def crm_objects( + object_type: str, + api_key: str, + refresh_token: str, + include_history: bool, + props: Sequence[str], + include_custom_props: bool = True, +) -> Iterator[TDataItems]: + """Building blocks for CRM resources.""" + if props == ALL: + props = list(_get_property_names(api_key, refresh_token, object_type)) + + if include_custom_props: + all_props = _get_property_names(api_key, refresh_token, object_type) + custom_props = [prop for prop in all_props if not prop.startswith("hs_")] + props = props + custom_props # type: ignore + + props = ",".join(sorted(list(set(props)))) + + if len(props) > 2000: + raise ValueError( + ( + "Your request to Hubspot is too long to process. " + "Maximum allowed query length is 2000 symbols, while " + f"your list of properties `{props[:200]}`... is {len(props)} " + "symbols long. Use the `props` argument of the resource to " + "set the list of properties to extract from the endpoint." + ) + ) + + params = {"properties": props, "limit": 100} + + yield from fetch_data(CRM_OBJECT_ENDPOINTS[object_type], api_key, refresh_token, params=params) + if include_history: + # Get history separately, as requesting both all properties and history together + # is likely to hit hubspot's URL length limit + for history_entries in fetch_property_history( + CRM_OBJECT_ENDPOINTS[object_type], + api_key, + props, + ): + yield dlt.mark.with_table_name( + history_entries, + OBJECT_TYPE_PLURAL[object_type] + "_property_history", + ) diff --git a/posthog/temporal/data_imports/pipelines/hubspot/auth.py b/posthog/temporal/data_imports/pipelines/hubspot/auth.py new file mode 100644 index 0000000000000..490552cfe237d --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/auth.py @@ -0,0 +1,42 @@ +import requests +from django.conf import settings +from typing import Tuple + + +def refresh_access_token(refresh_token: str) -> str: + res = requests.post( + "https://api.hubapi.com/oauth/v1/token", + data={ + "grant_type": "refresh_token", + "client_id": settings.HUBSPOT_APP_CLIENT_ID, + "client_secret": settings.HUBSPOT_APP_CLIENT_SECRET, + "refresh_token": refresh_token, + }, + ) + + if res.status_code != 200: + err_message = res.json()["message"] + raise Exception(err_message) + + return res.json()["access_token"] + + +def get_access_token_from_code(code: str, redirect_uri: str) -> Tuple[str, str]: + res = requests.post( + "https://api.hubapi.com/oauth/v1/token", + data={ + "grant_type": "authorization_code", + "client_id": settings.HUBSPOT_APP_CLIENT_ID, + "client_secret": settings.HUBSPOT_APP_CLIENT_SECRET, + "redirect_uri": redirect_uri, + "code": code, + }, + ) + + if res.status_code != 200: + err_message = res.json()["message"] + raise Exception(err_message) + + payload = res.json() + + return payload["access_token"], payload["refresh_token"] diff --git a/posthog/temporal/data_imports/pipelines/hubspot/helpers.py b/posthog/temporal/data_imports/pipelines/hubspot/helpers.py new file mode 100644 index 0000000000000..b724368fe7e40 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/helpers.py @@ -0,0 +1,198 @@ +"""Hubspot source helpers""" + +import urllib.parse +from typing import Iterator, Dict, Any, List, Optional + +from dlt.sources.helpers import requests +import requests as http_requests +from .settings import OBJECT_TYPE_PLURAL +from .auth import refresh_access_token + +BASE_URL = "https://api.hubapi.com/" + + +def get_url(endpoint: str) -> str: + """Get absolute hubspot endpoint URL""" + return urllib.parse.urljoin(BASE_URL, endpoint) + + +def _get_headers(api_key: str) -> Dict[str, str]: + """ + Return a dictionary of HTTP headers to use for API requests, including the specified API key. + + Args: + api_key (str): The API key to use for authentication, as a string. + + Returns: + dict: A dictionary of HTTP headers to include in API requests, with the `Authorization` header + set to the specified API key in the format `Bearer {api_key}`. + + """ + # Construct the dictionary of HTTP headers to use for API requests + return dict(authorization=f"Bearer {api_key}") + + +def extract_property_history(objects: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]: + for item in objects: + history = item.get("propertiesWithHistory") + if not history: + return + # Yield a flat list of property history entries + for key, changes in history.items(): + if not changes: + continue + for entry in changes: + yield {"object_id": item["id"], "property_name": key, **entry} + + +def fetch_property_history( + endpoint: str, + api_key: str, + props: str, + params: Optional[Dict[str, Any]] = None, +) -> Iterator[List[Dict[str, Any]]]: + """Fetch property history from the given CRM endpoint. + + Args: + endpoint: The endpoint to fetch data from, as a string. + api_key: The API key to use for authentication, as a string. + props: A comma separated list of properties to retrieve the history for + params: Optional dict of query params to include in the request + + Yields: + List of property history entries (dicts) + """ + # Construct the URL and headers for the API request + url = get_url(endpoint) + headers = _get_headers(api_key) + + params = dict(params or {}) + params["propertiesWithHistory"] = props + params["limit"] = 50 + # Make the API request + r = requests.get(url, headers=headers, params=params) + # Parse the API response and yield the properties of each result + + # Parse the response JSON data + _data = r.json() + while _data is not None: + if "results" in _data: + yield list(extract_property_history(_data["results"])) + + # Follow pagination links if they exist + _next = _data.get("paging", {}).get("next", None) + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + _data = r.json() + else: + _data = None + + +def fetch_data( + endpoint: str, api_key: str, refresh_token: str, params: Optional[Dict[str, Any]] = None +) -> Iterator[List[Dict[str, Any]]]: + """ + Fetch data from HUBSPOT endpoint using a specified API key and yield the properties of each result. + For paginated endpoint this function yields item from all pages. + + Args: + endpoint (str): The endpoint to fetch data from, as a string. + api_key (str): The API key to use for authentication, as a string. + params: Optional dict of query params to include in the request + + Yields: + A List of CRM object dicts + + Raises: + requests.exceptions.HTTPError: If the API returns an HTTP error status code. + + Notes: + This function uses the `requests` library to make a GET request to the specified endpoint, with + the API key included in the headers. If the API returns a non-successful HTTP status code (e.g. + 404 Not Found), a `requests.exceptions.HTTPError` exception will be raised. + + The `endpoint` argument should be a relative URL, which will be appended to the base URL for the + API. The `params` argument is used to pass additional query parameters to the request + + This function also includes a retry decorator that will automatically retry the API call up to + 3 times with a 5-second delay between retries, using an exponential backoff strategy. + """ + # Construct the URL and headers for the API request + url = get_url(endpoint) + headers = _get_headers(api_key) + + # Make the API request + try: + r = requests.get(url, headers=headers, params=params) + except http_requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + # refresh token + api_key = refresh_access_token(refresh_token) + headers = _get_headers(api_key) + r = requests.get(url, headers=headers, params=params) + else: + raise e + # Parse the API response and yield the properties of each result + # Parse the response JSON data + + _data = r.json() + # Yield the properties of each result in the API response + while _data is not None: + if "results" in _data: + _objects: List[Dict[str, Any]] = [] + for _result in _data["results"]: + _obj = _result.get("properties", _result) + if "id" not in _obj and "id" in _result: + # Move id from properties to top level + _obj["id"] = _result["id"] + if "associations" in _result: + for association in _result["associations"]: + __values = [ + { + "value": _obj["hs_object_id"], + f"{association}_id": __r["id"], + } + for __r in _result["associations"][association]["results"] + ] + + # remove duplicates from list of dicts + __values = [dict(t) for t in {tuple(d.items()) for d in __values}] + + _obj[association] = __values + _objects.append(_obj) + + yield _objects + + # Follow pagination links if they exist + _next = _data.get("paging", {}).get("next", None) + if _next: + next_url = _next["link"] + # Get the next page response + r = requests.get(next_url, headers=headers) + _data = r.json() + else: + _data = None + + +def _get_property_names(api_key: str, refresh_token: str, object_type: str) -> List[str]: + """ + Retrieve property names for a given entity from the HubSpot API. + + Args: + entity: The entity name for which to retrieve property names. + + Returns: + A list of property names. + + Raises: + Exception: If an error occurs during the API request. + """ + properties = [] + endpoint = f"/crm/v3/properties/{OBJECT_TYPE_PLURAL[object_type]}" + + for page in fetch_data(endpoint, api_key, refresh_token): + properties.extend([prop["name"] for prop in page]) + + return properties diff --git a/posthog/temporal/data_imports/pipelines/hubspot/settings.py b/posthog/temporal/data_imports/pipelines/hubspot/settings.py new file mode 100644 index 0000000000000..10af4c47b5a31 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/hubspot/settings.py @@ -0,0 +1,106 @@ +"""Hubspot source settings and constants""" + +from dlt.common import pendulum + +STARTDATE = pendulum.datetime(year=2000, month=1, day=1) + +CONTACT = "contact" +COMPANY = "company" +DEAL = "deal" +TICKET = "ticket" +QUOTE = "quote" + +CRM_CONTACTS_ENDPOINT = "/crm/v3/objects/contacts?associations=deals,tickets,quotes" +CRM_COMPANIES_ENDPOINT = "/crm/v3/objects/companies?associations=contacts,deals,tickets,quotes" +CRM_DEALS_ENDPOINT = "/crm/v3/objects/deals" +CRM_TICKETS_ENDPOINT = "/crm/v3/objects/tickets" +CRM_QUOTES_ENDPOINT = "/crm/v3/objects/quotes" + +CRM_OBJECT_ENDPOINTS = { + CONTACT: CRM_CONTACTS_ENDPOINT, + COMPANY: CRM_COMPANIES_ENDPOINT, + DEAL: CRM_DEALS_ENDPOINT, + TICKET: CRM_TICKETS_ENDPOINT, + QUOTE: CRM_QUOTES_ENDPOINT, +} + +WEB_ANALYTICS_EVENTS_ENDPOINT = "/events/v3/events?objectType={objectType}&objectId={objectId}&occurredAfter={occurredAfter}&occurredBefore={occurredBefore}&sort=-occurredAt" + +OBJECT_TYPE_SINGULAR = { + "companies": COMPANY, + "contacts": CONTACT, + "deals": DEAL, + "tickets": TICKET, + "quotes": QUOTE, +} + +OBJECT_TYPE_PLURAL = {v: k for k, v in OBJECT_TYPE_SINGULAR.items()} + + +ENDPOINTS = ( + OBJECT_TYPE_PLURAL[CONTACT], + OBJECT_TYPE_PLURAL[DEAL], + OBJECT_TYPE_PLURAL[COMPANY], + OBJECT_TYPE_PLURAL[TICKET], + OBJECT_TYPE_PLURAL[QUOTE], +) + +DEFAULT_DEAL_PROPS = [ + "amount", + "closedate", + "createdate", + "dealname", + "dealstage", + "hs_lastmodifieddate", + "hs_object_id", + "pipeline", +] + +DEFAULT_COMPANY_PROPS = [ + "createdate", + "domain", + "hs_lastmodifieddate", + "hs_object_id", + "name", +] + +DEFAULT_CONTACT_PROPS = [ + "createdate", + "email", + "firstname", + "hs_object_id", + "lastmodifieddate", + "lastname", +] + +DEFAULT_TICKET_PROPS = [ + "createdate", + "content", + "hs_lastmodifieddate", + "hs_object_id", + "hs_pipeline", + "hs_pipeline_stage", + "hs_ticket_category", + "hs_ticket_priority", + "subject", +] + +DEFAULT_QUOTE_PROPS = [ + "hs_createdate", + "hs_expiration_date", + "hs_lastmodifieddate", + "hs_object_id", + "hs_public_url_key", + "hs_status", + "hs_title", +] + +DEFAULT_PROPS = { + OBJECT_TYPE_PLURAL[CONTACT]: DEFAULT_CONTACT_PROPS, + OBJECT_TYPE_PLURAL[COMPANY]: DEFAULT_COMPANY_PROPS, + OBJECT_TYPE_PLURAL[DEAL]: DEFAULT_DEAL_PROPS, + OBJECT_TYPE_PLURAL[TICKET]: DEFAULT_TICKET_PROPS, + OBJECT_TYPE_PLURAL[QUOTE]: DEFAULT_QUOTE_PROPS, +} + +ALL = ("ALL",) diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index a62db7d664e40..eaaa431d7aef9 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,4 +1,8 @@ from posthog.warehouse.models import ExternalDataSource -from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS +from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS as STRIPE_ENDPOINTS +from posthog.temporal.data_imports.pipelines.hubspot.settings import ENDPOINTS as HUBSPOT_ENDPOINTS -PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS} +PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { + ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, + ExternalDataSource.Type.HUBSPOT: HUBSPOT_ENDPOINTS, +} diff --git a/posthog/views.py b/posthog/views.py index 4750cf170cc27..1f757f9833734 100644 --- a/posthog/views.py +++ b/posthog/views.py @@ -92,6 +92,7 @@ def security_txt(request): @never_cache def preflight_check(request: HttpRequest) -> JsonResponse: slack_client_id = SlackIntegration.slack_config().get("SLACK_APP_CLIENT_ID") + hubspot_client_id = settings.HUBSPOT_APP_CLIENT_ID response = { "django": True, @@ -113,6 +114,7 @@ def preflight_check(request: HttpRequest) -> JsonResponse: "available": bool(slack_client_id), "client_id": slack_client_id or None, }, + "data_warehouse_integrations": {"hubspot": {"client_id": hubspot_client_id}}, "object_storage": is_cloud() or is_object_storage_available(), } diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 48f8babed4a5a..843821e2f2749 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -25,6 +25,9 @@ from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, ) +from posthog.temporal.data_imports.pipelines.hubspot.auth import ( + get_access_token_from_code, +) import temporalio logger = structlog.get_logger(__name__) @@ -107,7 +110,6 @@ def get_queryset(self): ) def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: - client_secret = request.data["client_secret"] prefix = request.data.get("prefix", None) source_type = request.data["source_type"] @@ -127,18 +129,12 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: ) # TODO: remove dummy vars - new_source_model = ExternalDataSource.objects.create( - source_id=str(uuid.uuid4()), - connection_id=str(uuid.uuid4()), - destination_id=str(uuid.uuid4()), - team=self.team, - status="Running", - source_type=source_type, - job_inputs={ - "stripe_secret_key": client_secret, - }, - prefix=prefix, - ) + if source_type == ExternalDataSource.Type.STRIPE: + new_source_model = self._handle_stripe_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.HUBSPOT: + new_source_model = self._handle_hubspot_source(request, *args, **kwargs) + else: + raise NotImplementedError(f"Source type {source_type} not implemented") schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type] for schema in schemas: @@ -156,6 +152,54 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: return Response(status=status.HTTP_201_CREATED, data={"id": new_source_model.pk}) + def _handle_stripe_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + client_secret = payload.get("client_secret") + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + # TODO: remove dummy vars + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={ + "stripe_secret_key": client_secret, + }, + prefix=prefix, + ) + + return new_source_model + + def _handle_hubspot_source(self, request: Request, *args: Any, **kwargs: Any) -> ExternalDataSource: + payload = request.data["payload"] + code = payload.get("code") + redirect_uri = payload.get("redirect_uri") + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + access_token, refresh_token = get_access_token_from_code(code, redirect_uri=redirect_uri) + + # TODO: remove dummy vars + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={ + "hubspot_secret_key": access_token, + "hubspot_refresh_token": refresh_token, + }, + prefix=prefix, + ) + + return new_source_model + def prefix_required(self, source_type: str) -> bool: source_type_exists = ExternalDataSource.objects.filter(team_id=self.team.pk, source_type=source_type).exists() return source_type_exists diff --git a/posthog/warehouse/api/test/test_external_data_source.py b/posthog/warehouse/api/test/test_external_data_source.py index 2ad741b453a29..955c032c0373e 100644 --- a/posthog/warehouse/api/test/test_external_data_source.py +++ b/posthog/warehouse/api/test/test_external_data_source.py @@ -30,7 +30,7 @@ def _create_external_data_schema(self, source_id) -> ExternalDataSchema: def test_create_external_data_source(self): response = self.client.post( f"/api/projects/{self.team.id}/external_data_sources/", - data={"source_type": "Stripe", "client_secret": "sk_test_123"}, + data={"source_type": "Stripe", "payload": {"client_secret": "sk_test_123"}}, ) payload = response.json() @@ -46,7 +46,7 @@ def test_prefix_external_data_source(self): response = self.client.post( f"/api/projects/{self.team.id}/external_data_sources/", - data={"source_type": "Stripe", "client_secret": "sk_test_123"}, + data={"source_type": "Stripe", "payload": {"client_secret": "sk_test_123"}}, ) self.assertEqual(response.status_code, 201) @@ -54,7 +54,7 @@ def test_prefix_external_data_source(self): response = self.client.post( f"/api/projects/{self.team.id}/external_data_sources/", - data={"source_type": "Stripe", "client_secret": "sk_test_123"}, + data={"source_type": "Stripe", "payload": {"client_secret": "sk_test_123"}}, ) self.assertEqual(response.status_code, 400) @@ -63,7 +63,7 @@ def test_prefix_external_data_source(self): # Create with prefix response = self.client.post( f"/api/projects/{self.team.id}/external_data_sources/", - data={"source_type": "Stripe", "client_secret": "sk_test_123", "prefix": "test_"}, + data={"source_type": "Stripe", "payload": {"client_secret": "sk_test_123"}, "prefix": "test_"}, ) self.assertEqual(response.status_code, 201) @@ -71,7 +71,7 @@ def test_prefix_external_data_source(self): # Try to create same type with same prefix again response = self.client.post( f"/api/projects/{self.team.id}/external_data_sources/", - data={"source_type": "Stripe", "client_secret": "sk_test_123", "prefix": "test_"}, + data={"source_type": "Stripe", "payload": {"client_secret": "sk_test_123"}, "prefix": "test_"}, ) self.assertEqual(response.status_code, 400) diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 287a4a3f2cd99..5d8f736a77b94 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -8,6 +8,7 @@ class ExternalDataSource(CreatedMetaFields, UUIDModel): class Type(models.TextChoices): STRIPE = "Stripe", "Stripe" + HUBSPOT = "Hubspot", "Hubspot" class Status(models.TextChoices): RUNNING = "Running", "Running"