From f7f1408d567d1a9a00b51adf7562052854451891 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Fri, 26 Jan 2024 11:01:10 -0500 Subject: [PATCH] chore(data-warehouse): postgres integration (#19797) * basics * add db schema endpoint * frontend progress * form working * working * schema selection * refactor frontend * add team * format api.ts * typing * add internal network filter * typing * fix codescan * typing * fix test * add schemas * update feature flag from hubspot to postgres * remove sslmode * cleanup * fix test * fix test * add logo --- frontend/public/postgres-logo.svg | 22 ++ frontend/src/lib/api.ts | 17 + frontend/src/lib/constants.tsx | 2 +- .../data-warehouse/external/SourceModal.tsx | 202 ++++++------ ...neLogic.tsx => dataWarehouseSceneLogic.ts} | 0 .../external/forms/PostgresSchemaForm.tsx | 45 +++ .../external/forms/SourceForm.tsx | 33 ++ .../sourceFormLogic.ts} | 124 ++++---- .../external/sourceModalLogic.ts | 104 ------ .../external/sourceModalLogic.tsx | 295 ++++++++++++++++++ .../new_table/DataWarehouseTableForm.tsx | 7 +- .../redirect/DataWarehouseRedirectScene.tsx | 2 +- frontend/src/types.ts | 7 +- latest_migrations.manifest | 2 +- ...87_alter_externaldatasource_source_type.py | 19 ++ .../data_imports/external_data_job.py | 39 ++- .../pipelines/postgres/__init__.py | 72 +++++ .../pipelines/postgres/helpers.py | 124 ++++++++ .../pipelines/postgres/settings.py | 3 + .../data_imports/pipelines/schemas.py | 1 + .../temporal/tests/external_data/conftest.py | 71 +++++ .../test_external_data_job.py | 116 ++++++- posthog/warehouse/api/external_data_source.py | 99 +++++- .../api/test/test_external_data_source.py | 124 ++++++++ .../warehouse/models/external_data_schema.py | 26 +- .../warehouse/models/external_data_source.py | 1 + 26 files changed, 1263 insertions(+), 294 deletions(-) create mode 100644 frontend/public/postgres-logo.svg rename frontend/src/scenes/data-warehouse/external/{dataWarehouseSceneLogic.tsx => dataWarehouseSceneLogic.ts} (100%) create mode 100644 frontend/src/scenes/data-warehouse/external/forms/PostgresSchemaForm.tsx create mode 100644 frontend/src/scenes/data-warehouse/external/forms/SourceForm.tsx rename frontend/src/scenes/data-warehouse/external/{sourceFormLogic.tsx => forms/sourceFormLogic.ts} (61%) delete mode 100644 frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts create mode 100644 frontend/src/scenes/data-warehouse/external/sourceModalLogic.tsx create mode 100644 posthog/migrations/0387_alter_externaldatasource_source_type.py create mode 100644 posthog/temporal/data_imports/pipelines/postgres/__init__.py create mode 100644 posthog/temporal/data_imports/pipelines/postgres/helpers.py create mode 100644 posthog/temporal/data_imports/pipelines/postgres/settings.py create mode 100644 posthog/temporal/tests/external_data/conftest.py rename posthog/temporal/tests/{ => external_data}/test_external_data_job.py (81%) diff --git a/frontend/public/postgres-logo.svg b/frontend/public/postgres-logo.svg new file mode 100644 index 0000000000000..6b65997a98d5e --- /dev/null +++ b/frontend/public/postgres-logo.svg @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index efb943fbe7d1d..aba1f43b325a3 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -31,6 +31,7 @@ import { EventType, Experiment, ExportedAssetType, + ExternalDataPostgresSchema, ExternalDataSourceCreatePayload, ExternalDataSourceSchema, ExternalDataStripeSource, @@ -1826,6 +1827,22 @@ const api = { async reload(sourceId: ExternalDataStripeSource['id']): Promise { await new ApiRequest().externalDataSource(sourceId).withAction('reload').create() }, + async database_schema( + host: string, + port: string, + dbname: string, + user: string, + password: string, + schema: string + ): Promise { + const queryParams = toParams({ host, port, dbname, user, password, schema }) + + return await new ApiRequest() + .externalDataSources() + .withAction('database_schema') + .withQueryString(queryParams) + .get() + }, }, externalDataSchemas: { diff --git a/frontend/src/lib/constants.tsx b/frontend/src/lib/constants.tsx index e26389fa1d4c6..2ae32992808b4 100644 --- a/frontend/src/lib/constants.tsx +++ b/frontend/src/lib/constants.tsx @@ -159,7 +159,7 @@ export const FEATURE_FLAGS = { EXCEPTION_AUTOCAPTURE: 'exception-autocapture', DATA_WAREHOUSE: 'data-warehouse', // owner: @EDsCODE DATA_WAREHOUSE_VIEWS: 'data-warehouse-views', // owner: @EDsCODE - DATA_WAREHOUSE_HUBSPOT_IMPORT: 'data-warehouse-hubspot-import', // owner: @EDsCODE + DATA_WAREHOUSE_POSTGRES_IMPORT: 'data-warehouse-postgres-import', // owner: @EDsCODE FF_DASHBOARD_TEMPLATES: 'ff-dashboard-templates', // owner: @EDsCODE SHOW_PRODUCT_INTRO_EXISTING_PRODUCTS: 'show-product-intro-existing-products', // owner: @raquelmsmith ARTIFICIAL_HOG: 'artificial-hog', // owner: @Twixes diff --git a/frontend/src/scenes/data-warehouse/external/SourceModal.tsx b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx index 824b748af255f..ae546ea9eae21 100644 --- a/frontend/src/scenes/data-warehouse/external/SourceModal.tsx +++ b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx @@ -1,45 +1,106 @@ -import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps, Link } from '@posthog/lemon-ui' +import { LemonButton, LemonModal, LemonModalProps } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' -import { Form } from 'kea-forms' import { FEATURE_FLAGS } from 'lib/constants' -import { Field } from 'lib/forms/Field' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' import hubspotLogo from 'public/hubspot-logo.svg' +import postgresLogo from 'public/postgres-logo.svg' import stripeLogo from 'public/stripe-logo.svg' -import { ExternalDataSourceType } from '~/types' - import { DatawarehouseTableForm } from '../new_table/DataWarehouseTableForm' -import { SOURCE_DETAILS, SourceConfig, sourceFormLogic } from './sourceFormLogic' +import PostgresSchemaForm from './forms/PostgresSchemaForm' +import SourceForm from './forms/SourceForm' +import { SourceConfig } from './sourceModalLogic' import { sourceModalLogic } from './sourceModalLogic' interface SourceModalProps extends LemonModalProps {} export default function SourceModal(props: SourceModalProps): JSX.Element { - const { tableLoading, selectedConnector, isManualLinkFormVisible, connectors, addToHubspotButtonUrl } = - useValues(sourceModalLogic) - const { selectConnector, toggleManualLinkFormVisible, onClear } = useActions(sourceModalLogic) + const { modalTitle, modalCaption } = useValues(sourceModalLogic) + const { onClear, onBack, onSubmit } = useActions(sourceModalLogic) + const { currentStep } = useValues(sourceModalLogic) + + const footer = (): JSX.Element | null => { + if (currentStep === 1) { + return null + } + + return ( +
+ + Back + + onSubmit()} data-attr="source-link"> + Link + +
+ ) + } + + return ( + onClear()} + title={modalTitle} + description={modalCaption} + footer={footer()} + > + + + + + ) +} + +interface ModalPageProps { + page: number + children?: React.ReactNode +} + +function ModalPage({ children, page }: ModalPageProps): JSX.Element { + const { currentStep } = useValues(sourceModalLogic) + + if (currentStep !== page) { + return <> + } + + return
{children}
+} + +function FirstStep(): JSX.Element { + const { connectors, addToHubspotButtonUrl } = useValues(sourceModalLogic) + const { selectConnector, toggleManualLinkFormVisible, onNext } = useActions(sourceModalLogic) const { featureFlags } = useValues(featureFlagLogic) const MenuButton = (config: SourceConfig): JSX.Element => { const onClick = (): void => { selectConnector(config) + onNext() } if (config.name === 'Stripe') { return ( - + stripe logo ) } - if (config.name === 'Hubspot' && featureFlags[FEATURE_FLAGS.DATA_WAREHOUSE_HUBSPOT_IMPORT]) { + if (config.name === 'Hubspot') { return ( - - - hubspot logo - - + + hubspot logo + + ) + } + + if (config.name === 'Postgres' && featureFlags[FEATURE_FLAGS.DATA_WAREHOUSE_POSTGRES_IMPORT]) { + return ( + +
+ postgres logo +
Postgres
+
+
) } @@ -48,110 +109,37 @@ export default function SourceModal(props: SourceModalProps): JSX.Element { const onManualLinkClick = (): void => { toggleManualLinkFormVisible(true) + onNext() } - const formToShow = (): JSX.Element => { - if (selectedConnector) { - return - } - - if (isManualLinkFormVisible) { - return ( -
- - -
- - Back - - - Link - -
- - } - /> -
- ) - } - - return ( -
+ return ( + +
{connectors.map((config, index) => ( ))} - + Manual Link
- ) - } - - return ( - onClear()} - title={selectedConnector ? 'Link ' + selectedConnector.name : 'Select source to link'} - description={selectedConnector ? selectedConnector.caption : null} - > - {formToShow()} - +
) } -interface SourceFormProps { - sourceType: ExternalDataSourceType -} +function SecondStep(): JSX.Element { + const { selectedConnector } = useValues(sourceModalLogic) -function SourceForm({ sourceType }: SourceFormProps): JSX.Element { - const logic = sourceFormLogic({ sourceType }) - const { isExternalDataSourceSubmitting } = useValues(logic) - const { onBack } = useActions(logic) + return ( + + {selectedConnector ? : } + + ) +} +function ThirdStep(): JSX.Element { return ( -
- {SOURCE_DETAILS[sourceType].fields.map((field) => ( - - - - ))} - - - - -
- - Back - - - Link - -
- + + + ) } diff --git a/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx b/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.ts similarity index 100% rename from frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx rename to frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.ts diff --git a/frontend/src/scenes/data-warehouse/external/forms/PostgresSchemaForm.tsx b/frontend/src/scenes/data-warehouse/external/forms/PostgresSchemaForm.tsx new file mode 100644 index 0000000000000..423ca4fdbe016 --- /dev/null +++ b/frontend/src/scenes/data-warehouse/external/forms/PostgresSchemaForm.tsx @@ -0,0 +1,45 @@ +import { LemonSwitch, LemonTable } from '@posthog/lemon-ui' +import { useActions, useMountedLogic, useValues } from 'kea' + +import { sourceModalLogic } from '../sourceModalLogic' +import { sourceFormLogic } from './sourceFormLogic' + +export default function PostgresSchemaForm(): JSX.Element { + useMountedLogic(sourceFormLogic({ sourceType: 'Postgres' })) + const { selectSchema } = useActions(sourceModalLogic) + const { databaseSchema } = useValues(sourceModalLogic) + + return ( +
+
+ { + selectSchema(schema) + }} + /> + ) + }, + }, + ]} + /> +
+
+ ) +} diff --git a/frontend/src/scenes/data-warehouse/external/forms/SourceForm.tsx b/frontend/src/scenes/data-warehouse/external/forms/SourceForm.tsx new file mode 100644 index 0000000000000..df83d79972b5d --- /dev/null +++ b/frontend/src/scenes/data-warehouse/external/forms/SourceForm.tsx @@ -0,0 +1,33 @@ +import { LemonInput } from '@posthog/lemon-ui' +import { Form } from 'kea-forms' +import { Field } from 'lib/forms/Field' + +import { ExternalDataSourceType } from '~/types' + +import { SOURCE_DETAILS } from '../sourceModalLogic' +import { sourceFormLogic } from './sourceFormLogic' + +interface SourceFormProps { + sourceType: ExternalDataSourceType +} + +export default function SourceForm({ sourceType }: SourceFormProps): JSX.Element { + return ( +
+ {SOURCE_DETAILS[sourceType].fields.map((field) => ( + + + + ))} + + + +
+ ) +} diff --git a/frontend/src/scenes/data-warehouse/external/sourceFormLogic.tsx b/frontend/src/scenes/data-warehouse/external/forms/sourceFormLogic.ts similarity index 61% rename from frontend/src/scenes/data-warehouse/external/sourceFormLogic.tsx rename to frontend/src/scenes/data-warehouse/external/forms/sourceFormLogic.ts index a2be7fc80e4c7..7ed417510d1d7 100644 --- a/frontend/src/scenes/data-warehouse/external/sourceFormLogic.tsx +++ b/frontend/src/scenes/data-warehouse/external/forms/sourceFormLogic.ts @@ -3,74 +3,17 @@ import { actions, connect, kea, listeners, path, props } from 'kea' import { forms } from 'kea-forms' import { router, urlToAction } from 'kea-router' import api from 'lib/api' -import { Link } from 'lib/lemon-ui/Link' import { urls } from 'scenes/urls' import { ExternalDataSourceCreatePayload, ExternalDataSourceType } from '~/types' +import { getHubspotRedirectUri, sourceModalLogic } from '../sourceModalLogic' import type { sourceFormLogicType } from './sourceFormLogicType' -import { getHubspotRedirectUri, sourceModalLogic } from './sourceModalLogic' export interface SourceFormProps { sourceType: ExternalDataSourceType } -export interface SourceConfig { - name: ExternalDataSourceType - caption: string | JSX.Element - fields: FieldConfig[] - disabledReason?: string | null -} -interface FieldConfig { - name: string - label: string - type: string - required: boolean - placeholder: string -} - -export const SOURCE_DETAILS: Record = { - Stripe: { - name: 'Stripe', - caption: ( - <> - Enter your Stripe credentials to automatically pull your Stripe data into the PostHog Data warehouse. -
- You can find your account ID{' '} - - in your Stripe dashboard - - , and create a secret key{' '} - - here - - . - - ), - fields: [ - { - name: 'account_id', - label: 'Account ID', - type: 'text', - required: true, - placeholder: 'acct_...', - }, - { - name: 'client_secret', - label: 'Client Secret', - type: 'text', - required: true, - placeholder: 'sk_live_...', - }, - ], - }, - Hubspot: { - name: 'Hubspot', - fields: [], - caption: '', - }, -} - const getPayloadDefaults = (sourceType: string): Record => { switch (sourceType) { case 'Stripe': @@ -101,16 +44,21 @@ export const sourceFormLogic = kea([ path(['scenes', 'data-warehouse', 'external', 'sourceFormLogic']), props({} as SourceFormProps), connect({ - actions: [sourceModalLogic, ['onClear', 'toggleSourceModal', 'loadSources']], + actions: [ + sourceModalLogic, + ['setDatabaseSchemas', 'onBack', 'onNext', 'selectConnector', 'toggleSourceModal', 'loadSources'], + ], }), actions({ - onBack: true, + onCancel: true, handleRedirect: (kind: string, searchParams: any) => ({ kind, searchParams }), + onPostgresNext: true, }), listeners(({ actions }) => ({ - onBack: () => { + onCancel: () => { actions.resetExternalDataSource() - actions.onClear() + actions.onBack() + actions.selectConnector(null) }, submitExternalDataSourceSuccess: () => { lemonToast.success('New Data Resource Created') @@ -119,6 +67,9 @@ export const sourceFormLogic = kea([ actions.loadSources() router.actions.push(urls.dataWarehouseSettings()) }, + submitDatabaseSchemaFormSuccess: () => { + actions.onNext() + }, submitExternalDataSourceFailure: ({ error }) => { lemonToast.error(error?.message || 'Something went wrong') }, @@ -136,13 +87,14 @@ export const sourceFormLogic = kea([ lemonToast.error(`Something went wrong.`) } }, + onPostgresNext: async () => {}, })), urlToAction(({ actions }) => ({ '/data-warehouse/:kind/redirect': ({ kind = '' }, searchParams) => { actions.handleRedirect(kind, searchParams) }, })), - forms(({ props }) => ({ + forms(({ props, actions }) => ({ externalDataSource: { defaults: { prefix: '', @@ -155,5 +107,51 @@ export const sourceFormLogic = kea([ return newResource }, }, + databaseSchemaForm: { + defaults: { + prefix: '', + payload: { + host: '', + port: '', + dbname: '', + user: '', + password: '', + schema: '', + }, + }, + errors: ({ payload: { host, port, dbname, user, password, schema } }) => ({ + payload: { + host: !host && 'Please enter a host.', + port: !port && 'Please enter a port.', + dbname: !dbname && 'Please enter a dbname.', + user: !user && 'Please enter a user.', + password: !password && 'Please enter a password.', + schema: !schema && 'Please enter a schema.', + }, + }), + submit: async ({ payload: { host, port, dbname, user, password, schema }, prefix }) => { + const schemas = await api.externalDataSources.database_schema( + host, + port, + dbname, + user, + password, + schema + ) + actions.setDatabaseSchemas(schemas) + + return { + payload: { + host, + port, + dbname, + user, + password, + schema, + }, + prefix, + } + }, + }, })), ]) diff --git a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts deleted file mode 100644 index 852406e2b1b02..0000000000000 --- a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { actions, connect, kea, listeners, path, reducers, selectors } from 'kea' -import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic' - -import { dataWarehouseTableLogic } from '../new_table/dataWarehouseTableLogic' -import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic' -import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic' -import { SOURCE_DETAILS, SourceConfig } from './sourceFormLogic' -import type { sourceModalLogicType } from './sourceModalLogicType' - -export const getHubspotRedirectUri = (): string => `${window.location.origin}/data-warehouse/hubspot/redirect` - -export const sourceModalLogic = kea([ - path(['scenes', 'data-warehouse', 'external', 'sourceModalLogic']), - actions({ - selectConnector: (connector: SourceConfig | null) => ({ connector }), - toggleManualLinkFormVisible: (visible: boolean) => ({ visible }), - handleRedirect: (kind: string, searchParams: any) => ({ kind, searchParams }), - onClear: true, - }), - connect({ - values: [ - dataWarehouseTableLogic, - ['tableLoading'], - dataWarehouseSettingsLogic, - ['dataWarehouseSources'], - preflightLogic, - ['preflight'], - ], - actions: [ - dataWarehouseSceneLogic, - ['toggleSourceModal'], - dataWarehouseTableLogic, - ['resetTable'], - dataWarehouseSettingsLogic, - ['loadSources'], - ], - }), - reducers({ - selectedConnector: [ - null as SourceConfig | null, - { - selectConnector: (_, { connector }) => connector, - }, - ], - isManualLinkFormVisible: [ - false, - { - toggleManualLinkFormVisible: (_, { visible }) => visible, - }, - ], - }), - selectors({ - showFooter: [ - (s) => [s.selectedConnector, s.isManualLinkFormVisible], - (selectedConnector, isManualLinkFormVisible) => selectedConnector || isManualLinkFormVisible, - ], - connectors: [ - (s) => [s.dataWarehouseSources], - (sources): SourceConfig[] => { - return Object.values(SOURCE_DETAILS).map((connector) => ({ - ...connector, - disabledReason: - sources && sources.results.find((source) => source.source_type === connector.name) - ? 'Already linked' - : null, - })) - }, - ], - addToHubspotButtonUrl: [ - (s) => [s.preflight], - (preflight) => { - return () => { - const clientId = preflight?.data_warehouse_integrations?.hubspot.client_id - - if (!clientId) { - return null - } - - const scopes = [ - 'crm.objects.contacts.read', - 'crm.objects.companies.read', - 'crm.objects.deals.read', - 'tickets', - 'crm.objects.quotes.read', - ] - - const params = new URLSearchParams() - params.set('client_id', clientId) - params.set('redirect_uri', getHubspotRedirectUri()) - params.set('scope', scopes.join(' ')) - - return `https://app.hubspot.com/oauth/authorize?${params.toString()}` - } - }, - ], - }), - listeners(({ actions }) => ({ - onClear: () => { - actions.selectConnector(null) - actions.toggleManualLinkFormVisible(false) - actions.resetTable() - }, - })), -]) diff --git a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.tsx b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.tsx new file mode 100644 index 0000000000000..9c31f5f7e4811 --- /dev/null +++ b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.tsx @@ -0,0 +1,295 @@ +import { Link } from '@posthog/lemon-ui' +import { actions, connect, kea, listeners, path, reducers, selectors } from 'kea' +import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic' + +import { ExternalDataPostgresSchema, ExternalDataSourceType } from '~/types' + +import { dataWarehouseTableLogic } from '../new_table/dataWarehouseTableLogic' +import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic' +import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic' +import { sourceFormLogic } from './forms/sourceFormLogic' +import type { sourceModalLogicType } from './sourceModalLogicType' + +export const getHubspotRedirectUri = (): string => `${window.location.origin}/data-warehouse/hubspot/redirect` + +export interface SourceConfig { + name: ExternalDataSourceType + caption: string | React.ReactNode + fields: FieldConfig[] + disabledReason?: string | null +} +interface FieldConfig { + name: string + label: string + type: string + required: boolean + placeholder: string +} + +export const SOURCE_DETAILS: Record = { + Stripe: { + name: 'Stripe', + caption: ( + <> + Enter your Stripe credentials to automatically pull your Stripe data into the PostHog Data warehouse. +
+ You can find your account ID{' '} + + in your Stripe dashboard + + , and create a secret key{' '} + + here + + . + + ), + fields: [ + { + name: 'account_id', + label: 'Account ID', + type: 'text', + required: true, + placeholder: 'acct_...', + }, + { + name: 'client_secret', + label: 'Client Secret', + type: 'text', + required: true, + placeholder: 'sk_live_...', + }, + ], + }, + Hubspot: { + name: 'Hubspot', + fields: [], + caption: '', + }, + Postgres: { + name: 'Postgres', + caption: ( + <> + Enter your Postgres credentials to automatically pull your Postgres data into the PostHog Data + warehouse. + + ), + fields: [ + { + name: 'host', + label: 'Host', + type: 'text', + required: true, + placeholder: 'localhost', + }, + { + name: 'port', + label: 'Port', + type: 'number', + required: true, + placeholder: '5432', + }, + { + name: 'dbname', + label: 'Database', + type: 'text', + required: true, + placeholder: 'postgres', + }, + { + name: 'user', + label: 'User', + type: 'text', + required: true, + placeholder: 'postgres', + }, + { + name: 'password', + label: 'Password', + type: 'password', + required: true, + placeholder: 'password', + }, + { + name: 'schema', + label: 'Schema', + type: 'text', + required: true, + placeholder: 'public', + }, + ], + }, +} + +export const sourceModalLogic = kea([ + path(['scenes', 'data-warehouse', 'external', 'sourceModalLogic']), + actions({ + selectConnector: (connector: SourceConfig | null) => ({ connector }), + toggleManualLinkFormVisible: (visible: boolean) => ({ visible }), + handleRedirect: (kind: string, searchParams: any) => ({ kind, searchParams }), + onClear: true, + onBack: true, + onNext: true, + onSubmit: true, + setDatabaseSchemas: (schemas: ExternalDataPostgresSchema[]) => ({ schemas }), + selectSchema: (schema: ExternalDataPostgresSchema) => ({ schema }), + }), + connect({ + values: [ + dataWarehouseTableLogic, + ['tableLoading'], + dataWarehouseSettingsLogic, + ['dataWarehouseSources'], + preflightLogic, + ['preflight'], + ], + actions: [ + dataWarehouseSceneLogic, + ['toggleSourceModal'], + dataWarehouseTableLogic, + ['resetTable'], + dataWarehouseSettingsLogic, + ['loadSources'], + ], + }), + reducers({ + selectedConnector: [ + null as SourceConfig | null, + { + selectConnector: (_, { connector }) => connector, + }, + ], + isManualLinkFormVisible: [ + false, + { + toggleManualLinkFormVisible: (_, { visible }) => visible, + }, + ], + currentStep: [ + 1, + { + onNext: (state) => state + 1, + onBack: (state) => state - 1, + onClear: () => 1, + }, + ], + databaseSchema: [ + [] as ExternalDataPostgresSchema[], + { + setDatabaseSchemas: (_, { schemas }) => schemas, + selectSchema: (state, { schema }) => { + const newSchema = state.map((s) => ({ + ...s, + should_sync: s.table === schema.table ? !s.should_sync : s.should_sync, + })) + return newSchema + }, + }, + ], + }), + selectors({ + showFooter: [ + (s) => [s.selectedConnector, s.isManualLinkFormVisible], + (selectedConnector, isManualLinkFormVisible) => selectedConnector || isManualLinkFormVisible, + ], + connectors: [ + (s) => [s.dataWarehouseSources], + (sources): SourceConfig[] => { + return Object.values(SOURCE_DETAILS).map((connector) => ({ + ...connector, + disabledReason: + sources && sources.results.find((source) => source.source_type === connector.name) + ? 'Already linked' + : null, + })) + }, + ], + addToHubspotButtonUrl: [ + (s) => [s.preflight], + (preflight) => { + return () => { + const clientId = preflight?.data_warehouse_integrations?.hubspot.client_id + + if (!clientId) { + return null + } + + const scopes = [ + 'crm.objects.contacts.read', + 'crm.objects.companies.read', + 'crm.objects.deals.read', + 'tickets', + 'crm.objects.quotes.read', + ] + + const params = new URLSearchParams() + params.set('client_id', clientId) + params.set('redirect_uri', getHubspotRedirectUri()) + params.set('scope', scopes.join(' ')) + + return `https://app.hubspot.com/oauth/authorize?${params.toString()}` + } + }, + ], + modalTitle: [ + (s) => [s.currentStep], + (currentStep) => { + if (currentStep === 1) { + return 'Select a data source to get started' + } + if (currentStep === 2) { + return 'Link your data source' + } + + if (currentStep === 3) { + return 'Select tables to import' + } + + return '' + }, + ], + modalCaption: [ + (s) => [s.selectedConnector, s.currentStep], + (selectedConnector, currentStep) => { + if (currentStep == 2 && selectedConnector) { + return SOURCE_DETAILS[selectedConnector.name]?.caption + } + + return '' + }, + ], + }), + listeners(({ actions, values }) => ({ + onClear: () => { + actions.selectConnector(null) + actions.toggleManualLinkFormVisible(false) + actions.resetTable() + }, + onSubmit: () => { + // Shared function that triggers different actions depending on the current step + + if (values.currentStep === 1) { + return + } + + if (values.currentStep === 2) { + if (values.selectedConnector?.name === 'Postgres') { + sourceFormLogic({ sourceType: 'Postgres' }).actions.submitDatabaseSchemaForm() + } else if (values.selectedConnector?.name) { + sourceFormLogic({ sourceType: values.selectedConnector?.name }).actions.submitExternalDataSource() + } + } + + if (values.currentStep === 3) { + const logic = sourceFormLogic({ sourceType: 'Postgres' }) + + logic.actions.setExternalDataSourceValue('payload', { + ...logic.values.databaseSchemaForm.payload, + schemas: values.databaseSchema.filter((schema) => schema.should_sync).map((schema) => schema.table), + }) + logic.actions.setExternalDataSourceValue('prefix', logic.values.databaseSchemaForm.prefix) + logic.actions.submitExternalDataSource() + } + }, + })), +]) diff --git a/frontend/src/scenes/data-warehouse/new_table/DataWarehouseTableForm.tsx b/frontend/src/scenes/data-warehouse/new_table/DataWarehouseTableForm.tsx index ae6d5120d8316..bd00fa492c5ae 100644 --- a/frontend/src/scenes/data-warehouse/new_table/DataWarehouseTableForm.tsx +++ b/frontend/src/scenes/data-warehouse/new_table/DataWarehouseTableForm.tsx @@ -4,11 +4,7 @@ import { Field } from 'lib/forms/Field' import { dataWarehouseTableLogic } from './dataWarehouseTableLogic' -interface DataWarehouseTableFormProps { - footer?: JSX.Element -} - -export function DatawarehouseTableForm({ footer }: DataWarehouseTableFormProps): JSX.Element { +export function DatawarehouseTableForm(): JSX.Element { return (
@@ -76,7 +72,6 @@ export function DatawarehouseTableForm({ footer }: DataWarehouseTableFormProps): />
- {footer}
) } diff --git a/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx b/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx index 217d776bb7236..89b23c525c094 100644 --- a/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx +++ b/frontend/src/scenes/data-warehouse/redirect/DataWarehouseRedirectScene.tsx @@ -1,7 +1,7 @@ import { LemonButton, LemonInput } from '@posthog/lemon-ui' import { Form } from 'kea-forms' import { Field } from 'lib/forms/Field' -import { sourceFormLogic } from 'scenes/data-warehouse/external/sourceFormLogic' +import { sourceFormLogic } from 'scenes/data-warehouse/external/forms/sourceFormLogic' import { SceneExport } from 'scenes/sceneTypes' export const scene: SceneExport = { diff --git a/frontend/src/types.ts b/frontend/src/types.ts index c84b668a452e5..9adcfc3e681af 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3396,7 +3396,7 @@ export interface DataWarehouseViewLink { from_join_key?: string } -export type ExternalDataSourceType = 'Stripe' | 'Hubspot' +export type ExternalDataSourceType = 'Stripe' | 'Hubspot' | 'Postgres' export interface ExternalDataSourceCreatePayload { source_type: ExternalDataSourceType @@ -3420,6 +3420,11 @@ export interface SimpleExternalDataSourceSchema { last_synced_at?: Dayjs } +export interface ExternalDataPostgresSchema { + table: string + should_sync: boolean +} + export interface ExternalDataSourceSchema extends SimpleExternalDataSourceSchema { table?: SimpleDataWarehouseTable } diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 81538fe8c3352..b60532a5c5307 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0386_add_session_replay_config_to_team +posthog: 0387_alter_externaldatasource_source_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0387_alter_externaldatasource_source_type.py b/posthog/migrations/0387_alter_externaldatasource_source_type.py new file mode 100644 index 0000000000000..649710d87a6cf --- /dev/null +++ b/posthog/migrations/0387_alter_externaldatasource_source_type.py @@ -0,0 +1,19 @@ +# Generated by Django 3.2.19 on 2024-01-16 19:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0386_add_session_replay_config_to_team"), + ] + + operations = [ + migrations.AlterField( + model_name="externaldatasource", + name="source_type", + field=models.CharField( + choices=[("Stripe", "Stripe"), ("Hubspot", "Hubspot"), ("Postgres", "Postgres")], max_length=128 + ), + ), + ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 0f380f0a542b4..b9a12ade6b236 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -24,9 +24,11 @@ sync_old_schemas_with_new_schemas, ExternalDataSource, ) +from posthog.warehouse.models.external_data_schema import get_postgres_schemas from posthog.temporal.common.logger import bind_temporal_worker_logger from typing import Tuple import asyncio +from django.conf import settings @dataclasses.dataclass @@ -49,9 +51,21 @@ async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) -> source.status = "Running" await sync_to_async(source.save)() # type: ignore - # Sync schemas if they have changed + if source.source_type == ExternalDataSource.Type.POSTGRES: + host = source.job_inputs.get("host") + port = source.job_inputs.get("port") + user = source.job_inputs.get("user") + password = source.job_inputs.get("password") + database = source.job_inputs.get("database") + schema = source.job_inputs.get("schema") + schemas_to_sync = await sync_to_async(get_postgres_schemas)( # type: ignore + host, port, database, user, password, schema + ) + else: + schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]) + await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore - list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]), # type: ignore + schemas_to_sync, source_id=inputs.external_data_source_id, team_id=inputs.team_id, ) @@ -175,6 +189,27 @@ async def run_external_data_job(inputs: ExternalDataJobInputs) -> None: team_id=inputs.team_id, endpoints=tuple(inputs.schemas), ) + elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES: + from posthog.temporal.data_imports.pipelines.postgres import postgres_source + + host = model.pipeline.job_inputs.get("host") + port = model.pipeline.job_inputs.get("port") + user = model.pipeline.job_inputs.get("user") + password = model.pipeline.job_inputs.get("password") + database = model.pipeline.job_inputs.get("database") + schema = model.pipeline.job_inputs.get("schema") + + source = postgres_source( + host=host, + port=port, + user=user, + password=password, + database=database, + sslmode="prefer" if settings.TEST or settings.DEBUG else "require", + schema=schema, + table_names=inputs.schemas, + ) + else: raise ValueError(f"Source type {model.pipeline.source_type} not supported") diff --git a/posthog/temporal/data_imports/pipelines/postgres/__init__.py b/posthog/temporal/data_imports/pipelines/postgres/__init__.py new file mode 100644 index 0000000000000..438b25fbe9dac --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/postgres/__init__.py @@ -0,0 +1,72 @@ +"""Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.""" + +from typing import List, Optional, Union, Iterable, Any +from sqlalchemy import MetaData, Table, text +from sqlalchemy.engine import Engine + +import dlt +from dlt.sources import DltResource, DltSource + + +from dlt.sources.credentials import ConnectionStringCredentials + +from .helpers import ( + table_rows, + engine_from_credentials, + get_primary_key, + SqlDatabaseTableConfiguration, + SqlTableResourceConfiguration, +) + + +def postgres_source( + host: str, port: int, user: str, password: str, database: str, sslmode: str, schema: str, table_names: list[str] +) -> DltSource: + credentials = ConnectionStringCredentials( + f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" + ) + db_source = sql_database(credentials, schema=schema, table_names=table_names) + + return db_source + + +@dlt.source +def sql_database( + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + table_names: Optional[List[str]] = dlt.config.value, +) -> Iterable[DltResource]: + """ + A DLT source which loads data from an SQL database using SQLAlchemy. + Resources are automatically created for each table in the schema or from the given list of tables. + + Args: + credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. + schema (Optional[str]): Name of the database schema to load (if different from default). + metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. + table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. + + Returns: + Iterable[DltResource]: A list of DLT resources for each table to be loaded. + """ + + # set up alchemy engine + engine = engine_from_credentials(credentials) + engine.execution_options(stream_results=True) + metadata = metadata or MetaData(schema=schema) + + # use provided tables or all tables + if table_names: + tables = [Table(name, metadata, autoload_with=engine) for name in table_names] + else: + metadata.reflect(bind=engine) + tables = list(metadata.tables.values()) + + for table in tables: + yield dlt.resource( + table_rows, + name=table.name, + primary_key=get_primary_key(table), + spec=SqlDatabaseTableConfiguration, + )(engine, table) diff --git a/posthog/temporal/data_imports/pipelines/postgres/helpers.py b/posthog/temporal/data_imports/pipelines/postgres/helpers.py new file mode 100644 index 0000000000000..7d45a6df7e302 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/postgres/helpers.py @@ -0,0 +1,124 @@ +"""SQL database source helpers""" + +from typing import ( + Any, + List, + Optional, + Iterator, + Union, +) +import operator + +import dlt +from dlt.sources.credentials import ConnectionStringCredentials +from dlt.common.configuration.specs import BaseConfiguration, configspec +from dlt.common.typing import TDataItem +from .settings import DEFAULT_CHUNK_SIZE + +from sqlalchemy import Table, create_engine, Column +from sqlalchemy.engine import Engine +from sqlalchemy.sql import Select + + +class TableLoader: + def __init__( + self, + engine: Engine, + table: Table, + chunk_size: int = 1000, + incremental: Optional[dlt.sources.incremental[Any]] = None, + ) -> None: + self.engine = engine + self.table = table + self.chunk_size = chunk_size + self.incremental = incremental + if incremental: + try: + self.cursor_column: Optional[Column[Any]] = table.c[incremental.cursor_path] + except KeyError as e: + raise KeyError( + f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" + ) from e + self.last_value = incremental.last_value + else: + self.cursor_column = None + self.last_value = None + + def make_query(self) -> Select[Any]: + table = self.table + query = table.select() + if not self.incremental: + return query + last_value_func = self.incremental.last_value_func + if last_value_func is max: # Query ordered and filtered according to last_value function + order_by = self.cursor_column.asc() # type: ignore + filter_op = operator.ge + elif last_value_func is min: + order_by = self.cursor_column.desc() # type: ignore + filter_op = operator.le + else: # Custom last_value, load everything and let incremental handle filtering + return query + query = query.order_by(order_by) + if self.last_value is None: + return query + return query.where(filter_op(self.cursor_column, self.last_value)) # type: ignore + + def load_rows(self) -> Iterator[List[TDataItem]]: + query = self.make_query() + with self.engine.connect() as conn: + result = conn.execution_options(yield_per=self.chunk_size).execute(query) + for partition in result.partitions(size=self.chunk_size): + yield [dict(row._mapping) for row in partition] + + +def table_rows( + engine: Engine, + table: Table, + chunk_size: int = DEFAULT_CHUNK_SIZE, + incremental: Optional[dlt.sources.incremental[Any]] = None, +) -> Iterator[TDataItem]: + """ + A DLT source which loads data from an SQL database using SQLAlchemy. + Resources are automatically created for each table in the schema or from the given list of tables. + + Args: + credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. + schema (Optional[str]): Name of the database schema to load (if different from default). + metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. + table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. + + Returns: + Iterable[DltResource]: A list of DLT resources for each table to be loaded. + """ + loader = TableLoader(engine, table, incremental=incremental, chunk_size=chunk_size) + yield from loader.load_rows() + + engine.dispose() + + +def engine_from_credentials(credentials: Union[ConnectionStringCredentials, Engine, str]) -> Engine: + if isinstance(credentials, Engine): + return credentials + if isinstance(credentials, ConnectionStringCredentials): + credentials = credentials.to_native_representation() + return create_engine(credentials) + + +def get_primary_key(table: Table) -> List[str]: + return [c.name for c in table.primary_key] + + +@configspec +class SqlDatabaseTableConfiguration(BaseConfiguration): + incremental: Optional[dlt.sources.incremental] = None + + +@configspec +class SqlTableResourceConfiguration(BaseConfiguration): + credentials: ConnectionStringCredentials + table: str + incremental: Optional[dlt.sources.incremental] = None + schema: Optional[str] + + +__source_name__ = "sql_database" diff --git a/posthog/temporal/data_imports/pipelines/postgres/settings.py b/posthog/temporal/data_imports/pipelines/postgres/settings.py new file mode 100644 index 0000000000000..f1aa2d4b26401 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/postgres/settings.py @@ -0,0 +1,3 @@ +"""Sql Database source settings and constants""" + +DEFAULT_CHUNK_SIZE = 1000 diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index eaaa431d7aef9..371f8087b7966 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -5,4 +5,5 @@ PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = { ExternalDataSource.Type.STRIPE: STRIPE_ENDPOINTS, ExternalDataSource.Type.HUBSPOT: HUBSPOT_ENDPOINTS, + ExternalDataSource.Type.POSTGRES: (), } diff --git a/posthog/temporal/tests/external_data/conftest.py b/posthog/temporal/tests/external_data/conftest.py new file mode 100644 index 0000000000000..1d2fbcf47b8f0 --- /dev/null +++ b/posthog/temporal/tests/external_data/conftest.py @@ -0,0 +1,71 @@ +import psycopg +import pytest_asyncio +from psycopg import sql + + +@pytest_asyncio.fixture +async def setup_postgres_test_db(postgres_config): + """Fixture to manage a database for Redshift export testing. + + Managing a test database involves the following steps: + 1. Creating a test database. + 2. Initializing a connection to that database. + 3. Creating a test schema. + 4. Yielding the connection to be used in tests. + 5. After tests, drop the test schema and any tables in it. + 6. Drop the test database. + """ + connection = await psycopg.AsyncConnection.connect( + user=postgres_config["user"], + password=postgres_config["password"], + host=postgres_config["host"], + port=postgres_config["port"], + ) + await connection.set_autocommit(True) + + async with connection.cursor() as cursor: + await cursor.execute( + sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), + (postgres_config["database"],), + ) + + if await cursor.fetchone() is None: + await cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(postgres_config["database"]))) + + await connection.close() + + # We need a new connection to connect to the database we just created. + connection = await psycopg.AsyncConnection.connect( + user=postgres_config["user"], + password=postgres_config["password"], + host=postgres_config["host"], + port=postgres_config["port"], + dbname=postgres_config["database"], + ) + await connection.set_autocommit(True) + + async with connection.cursor() as cursor: + await cursor.execute( + sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(postgres_config["schema"])) + ) + + yield + + async with connection.cursor() as cursor: + await cursor.execute(sql.SQL("DROP SCHEMA {} CASCADE").format(sql.Identifier(postgres_config["schema"]))) + + await connection.close() + + # We need a new connection to drop the database, as we cannot drop the current database. + connection = await psycopg.AsyncConnection.connect( + user=postgres_config["user"], + password=postgres_config["password"], + host=postgres_config["host"], + port=postgres_config["port"], + ) + await connection.set_autocommit(True) + + async with connection.cursor() as cursor: + await cursor.execute(sql.SQL("DROP DATABASE {}").format(sql.Identifier(postgres_config["database"]))) + + await connection.close() diff --git a/posthog/temporal/tests/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py similarity index 81% rename from posthog/temporal/tests/test_external_data_job.py rename to posthog/temporal/tests/external_data/test_external_data_job.py index 1af196f368831..a4574502ff9d4 100644 --- a/posthog/temporal/tests/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -41,6 +41,7 @@ import functools from django.conf import settings import asyncio +import psycopg BUCKET_NAME = "test-external-data-jobs" SESSION = aioboto3.Session() @@ -89,6 +90,33 @@ async def minio_client(bucket_name): await minio_client.delete_bucket(Bucket=bucket_name) +@pytest.fixture +def postgres_config(): + return { + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "database": "external_data_database", + "schema": "external_data_schema", + "host": settings.PG_HOST, + "port": int(settings.PG_PORT), + } + + +@pytest_asyncio.fixture +async def postgres_connection(postgres_config, setup_postgres_test_db): + connection = await psycopg.AsyncConnection.connect( + user=postgres_config["user"], + password=postgres_config["password"], + dbname=postgres_config["database"], + host=postgres_config["host"], + port=postgres_config["port"], + ) + + yield connection + + await connection.close() + + @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_create_external_job_activity(activity_environment, team, **kwargs): @@ -110,15 +138,14 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() # type:ignore - assert len(schemas) == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]) + assert len(schemas) == 0 + count = await sync_to_async(ExternalDataSchema.objects.filter(source_id=new_source.pk).count)() # type:ignore + assert count == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]) @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_create_external_job_activity_schemas_exist(activity_environment, team, **kwargs): - """ - Test that the create external job activity creates a new job - """ new_source = await sync_to_async(ExternalDataSource.objects.create)( source_id=uuid.uuid4(), connection_id=uuid.uuid4(), @@ -147,8 +174,10 @@ async def test_create_external_job_activity_schemas_exist(activity_environment, runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() # type:ignore - # one less schema because one of the schemas is turned off - assert len(schemas) == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]) - 1 + assert len(schemas) == 1 + # doesn't overlap + count = await sync_to_async(ExternalDataSchema.objects.filter(source_id=new_source.pk).count)() # type:ignore + assert count == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]) @pytest.mark.django_db(transaction=True) @@ -444,6 +473,14 @@ async def test_external_data_job_workflow_with_schema(team, **kwargs): external_data_source_id=new_source.pk, ) + schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] + for schema in schemas: + await sync_to_async(ExternalDataSchema.objects.create)( # type: ignore + name=schema, + team_id=team.id, + source_id=new_source.pk, + ) + async def mock_async_func(inputs): pass @@ -480,3 +517,70 @@ async def mock_async_func(inputs): assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == len( # type: ignore PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] ) + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_run_postgres_job( + activity_environment, team, minio_client, postgres_connection, postgres_config, **kwargs +): + await postgres_connection.execute( + "CREATE TABLE IF NOT EXISTS {schema}.posthog_test (id integer)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.execute( + "INSERT INTO {schema}.posthog_test (id) VALUES (1)".format(schema=postgres_config["schema"]) + ) + await postgres_connection.commit() + + async def setup_job_1(): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Postgres", + job_inputs={ + "host": postgres_config["host"], + "port": postgres_config["port"], + "database": postgres_config["database"], + "user": postgres_config["user"], + "password": postgres_config["password"], + "schema": postgres_config["schema"], + }, + ) # type: ignore + + new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( # type: ignore + team_id=team.id, + pipeline_id=new_source.pk, + status=ExternalDataJob.Status.RUNNING, + rows_synced=0, + ) + + new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() # type: ignore + + schemas = ["posthog_test"] + inputs = ExternalDataJobInputs( + team_id=team.id, + run_id=new_job.pk, + source_id=new_source.pk, + schemas=schemas, + ) + + return new_job, inputs + + job_1, job_1_inputs = await setup_job_1() + + with override_settings( + BUCKET_URL=f"s3://{BUCKET_NAME}", + AIRBYTE_BUCKET_KEY=settings.OBJECT_STORAGE_ACCESS_KEY_ID, + AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + ): + await asyncio.gather( + activity_environment.run(run_external_data_job, job_1_inputs), + ) + + job_1_team_objects = await minio_client.list_objects_v2( + Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/posthog_test/" + ) + assert len(job_1_team_objects["Contents"]) == 1 diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 843821e2f2749..6078e9d8cd6da 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -28,8 +28,13 @@ from posthog.temporal.data_imports.pipelines.hubspot.auth import ( get_access_token_from_code, ) +from posthog.warehouse.models.external_data_schema import get_postgres_schemas + import temporalio +from posthog.cloud_utils import is_cloud +from posthog.utils import get_instance_region + logger = structlog.get_logger(__name__) @@ -133,10 +138,23 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: new_source_model = self._handle_stripe_source(request, *args, **kwargs) elif source_type == ExternalDataSource.Type.HUBSPOT: new_source_model = self._handle_hubspot_source(request, *args, **kwargs) + elif source_type == ExternalDataSource.Type.POSTGRES: + try: + new_source_model, table_names = self._handle_postgres_source(request, *args, **kwargs) + except InternalPostgresError: + return Response( + status=status.HTTP_400_BAD_REQUEST, data={"message": "Cannot use internal Postgres database"} + ) + except Exception: + raise else: raise NotImplementedError(f"Source type {source_type} not implemented") - schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type] + if source_type == ExternalDataSource.Type.POSTGRES: + schemas = tuple(table_names) + else: + schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type] + for schema in schemas: ExternalDataSchema.objects.create( name=schema, @@ -200,6 +218,43 @@ def _handle_hubspot_source(self, request: Request, *args: Any, **kwargs: Any) -> return new_source_model + def _handle_postgres_source(self, request: Request, *args: Any, **kwargs: Any) -> tuple[ExternalDataSource, list]: + payload = request.data["payload"] + prefix = request.data.get("prefix", None) + source_type = request.data["source_type"] + + host = payload.get("host") + port = payload.get("port") + database = payload.get("dbname") + + user = payload.get("user") + password = payload.get("password") + schema = payload.get("schema") + table_names = payload.get("schemas") + + if not self._validate_postgres_host(host, self.team_id): + raise InternalPostgresError() + + new_source_model = ExternalDataSource.objects.create( + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + team=self.team, + status="Running", + source_type=source_type, + job_inputs={ + "host": host, + "port": port, + "database": database, + "user": user, + "password": password, + "schema": schema, + }, + prefix=prefix, + ) + + return new_source_model, table_names + def prefix_required(self, source_type: str) -> bool: source_type_exists = ExternalDataSource.objects.filter(team_id=self.team.pk, source_type=source_type).exists() return source_type_exists @@ -262,3 +317,45 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): instance.status = "Running" instance.save() return Response(status=status.HTTP_200_OK) + + @action(methods=["GET"], detail=False) + def database_schema(self, request: Request, *arg: Any, **kwargs: Any): + host = request.query_params.get("host") + port = request.query_params.get("port") + database = request.query_params.get("dbname") + + user = request.query_params.get("user") + password = request.query_params.get("password") + schema = request.query_params.get("schema") + + if not host or not port or not database or not user or not password or not schema: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Missing required parameters: host, port, database, user, password, schema"}, + ) + + # Validate internal postgres + if not self._validate_postgres_host(host, self.team_id): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Cannot use internal Postgres database"}, + ) + + result = get_postgres_schemas(host, port, database, user, password, schema) + result_mapped_to_options = [{"table": row, "should_sync": False} for row in result] + return Response(status=status.HTTP_200_OK, data=result_mapped_to_options) + + def _validate_postgres_host(self, host: str, team_id: int) -> bool: + if host.startswith("172") or host.startswith("10") or host.startswith("localhost"): + if is_cloud(): + region = get_instance_region() + if (region == "US" and team_id == 2) or (region == "EU" and team_id == 1): + return True + else: + return False + + return True + + +class InternalPostgresError(Exception): + pass diff --git a/posthog/warehouse/api/test/test_external_data_source.py b/posthog/warehouse/api/test/test_external_data_source.py index 955c032c0373e..07ce645788d58 100644 --- a/posthog/warehouse/api/test/test_external_data_source.py +++ b/posthog/warehouse/api/test/test_external_data_source.py @@ -5,6 +5,11 @@ from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, ) +from django.test import override_settings +from django.conf import settings +from posthog.cloud_utils import TEST_clear_cloud_cache +from posthog.models import Team +import psycopg class TestSavedQuery(APIBaseTest): @@ -135,3 +140,122 @@ def test_reload_external_data_source(self, mock_trigger): self.assertEqual(mock_trigger.call_count, 1) self.assertEqual(response.status_code, 200) self.assertEqual(source.status, "Running") + + def test_database_schema(self): + postgres_connection = psycopg.connect( + host=settings.PG_HOST, + port=settings.PG_PORT, + dbname=settings.PG_DATABASE, + user=settings.PG_USER, + password=settings.PG_PASSWORD, + ) + + with postgres_connection.cursor() as cursor: + cursor.execute( + """ + CREATE TABLE IF NOT EXISTS posthog_test ( + id SERIAL PRIMARY KEY, + name VARCHAR(50) + ); + """ + ) + + postgres_connection.commit() + + response = self.client.get( + f"/api/projects/{self.team.id}/external_data_sources/database_schema/", + data={ + "host": settings.PG_HOST, + "port": int(settings.PG_PORT), + "dbname": settings.PG_DATABASE, + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "schema": "public", + }, + ) + results = response.json() + + self.assertEqual(response.status_code, 200) + + table_names = [table["table"] for table in results] + self.assertTrue("posthog_test" in table_names) + + with postgres_connection.cursor() as cursor: + cursor.execute( + """ + DROP TABLE posthog_test; + """ + ) + postgres_connection.commit() + + postgres_connection.close() + + @patch("posthog.warehouse.api.external_data_source.get_postgres_schemas") + def test_internal_postgres(self, patch_get_postgres_schemas): + patch_get_postgres_schemas.return_value = ["table_1"] + + TEST_clear_cloud_cache(True) + + with override_settings(REGION="US"): + team_2, _ = Team.objects.get_or_create(id=2, organization=self.team.organization) + response = self.client.get( + f"/api/projects/{team_2.id}/external_data_sources/database_schema/", + data={ + "host": "172.16.0.0", + "port": int(settings.PG_PORT), + "dbname": settings.PG_DATABASE, + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "schema": "public", + }, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), [{"should_sync": False, "table": "table_1"}]) + + new_team = Team.objects.create(name="new_team", organization=self.team.organization) + + response = self.client.get( + f"/api/projects/{new_team.id}/external_data_sources/database_schema/", + data={ + "host": "172.16.0.0", + "port": int(settings.PG_PORT), + "dbname": settings.PG_DATABASE, + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "schema": "public", + }, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json(), {"message": "Cannot use internal Postgres database"}) + + with override_settings(REGION="EU"): + team_1, _ = Team.objects.get_or_create(id=1, organization=self.team.organization) + response = self.client.get( + f"/api/projects/{team_1.id}/external_data_sources/database_schema/", + data={ + "host": "172.16.0.0", + "port": int(settings.PG_PORT), + "dbname": settings.PG_DATABASE, + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "schema": "public", + }, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), [{"should_sync": False, "table": "table_1"}]) + + new_team = Team.objects.create(name="new_team", organization=self.team.organization) + + response = self.client.get( + f"/api/projects/{new_team.id}/external_data_sources/database_schema/", + data={ + "host": "172.16.0.0", + "port": int(settings.PG_PORT), + "dbname": settings.PG_DATABASE, + "user": settings.PG_USER, + "password": settings.PG_PASSWORD, + "schema": "public", + }, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json(), {"message": "Cannot use internal Postgres database"}) diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 29419fbb7acd2..8a4ac00e81416 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -3,6 +3,8 @@ from posthog.models.team import Team from posthog.models.utils import CreatedMetaFields, UUIDModel, sane_repr import uuid +import psycopg +from django.conf import settings class ExternalDataSchema(CreatedMetaFields, UUIDModel): @@ -43,4 +45,26 @@ def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, t schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas] for schema in schemas_to_create: - ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id) + ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=False) + + +def get_postgres_schemas(host: str, port: str, database: str, user: str, password: str, schema: str): + connection = psycopg.Connection.connect( + host=host, + port=int(port), + dbname=database, + user=user, + password=password, + sslmode="prefer" if settings.TEST or settings.DEBUG else "require", + ) + + with connection.cursor() as cursor: + cursor.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = %(schema)s", {"schema": schema} + ) + result = cursor.fetchall() + result = [row[0] for row in result] + + connection.close() + + return result diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 5d8f736a77b94..667ba244aca99 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -9,6 +9,7 @@ class ExternalDataSource(CreatedMetaFields, UUIDModel): class Type(models.TextChoices): STRIPE = "Stripe", "Stripe" HUBSPOT = "Hubspot", "Hubspot" + POSTGRES = "Postgres", "Postgres" class Status(models.TextChoices): RUNNING = "Running", "Running"