([
},
},
})),
- urlToAction(({ actions, props }) => ({
- [urls.dataWarehouseTable(props.id ?? 'new')]: (_, __, ___, { method }) => {
- // If the URL was pushed (user clicked on a link), reset the scene's data.
- // This avoids resetting form fields if you click back/forward.
- if (method === 'PUSH') {
- if (props.id) {
- actions.loadTable()
- } else {
- actions.resetTable()
- }
- }
- },
- })),
- afterMount(async ({ props, actions }) => {
- // if (props.id !== 'new') {
- // await actions.loadTable()
- // }
- if (props.id === 'new') {
- actions.resetTable()
- }
- }),
])
diff --git a/frontend/src/scenes/data-warehouse/external/DataWarehouseExternalScene.tsx b/frontend/src/scenes/data-warehouse/external/DataWarehouseExternalScene.tsx
index 3ce744ff5e5d7..bdbef3bfdbc60 100644
--- a/frontend/src/scenes/data-warehouse/external/DataWarehouseExternalScene.tsx
+++ b/frontend/src/scenes/data-warehouse/external/DataWarehouseExternalScene.tsx
@@ -1,14 +1,18 @@
-import { LemonButton, LemonTag, Link } from '@posthog/lemon-ui'
+import { LemonTag, Link, LemonButtonWithSideAction } from '@posthog/lemon-ui'
import { PageHeader } from 'lib/components/PageHeader'
import { SceneExport } from 'scenes/sceneTypes'
import { urls } from 'scenes/urls'
-import { useValues } from 'kea'
+import { useActions, useValues } from 'kea'
import { router } from 'kea-router'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
import { ProductKey } from '~/types'
import { DataWarehouseTablesContainer } from './DataWarehouseTables'
import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic'
import { DataWarehousePageTabs, DataWarehouseTab } from '../DataWarehousePageTabs'
+import SourceModal from './SourceModal'
+import { IconSettings } from 'lib/lemon-ui/icons'
+import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
+import { FEATURE_FLAGS } from 'lib/constants'
export const scene: SceneExport = {
component: DataWarehouseExternalScene,
@@ -16,7 +20,10 @@ export const scene: SceneExport = {
}
export function DataWarehouseExternalScene(): JSX.Element {
- const { shouldShowEmptyState, shouldShowProductIntroduction } = useValues(dataWarehouseSceneLogic)
+ const { shouldShowEmptyState, shouldShowProductIntroduction, isSourceModalOpen } =
+ useValues(dataWarehouseSceneLogic)
+ const { toggleSourceModal } = useActions(dataWarehouseSceneLogic)
+ const { featureFlags } = useValues(featureFlagLogic)
return (
@@ -30,14 +37,20 @@ export function DataWarehouseExternalScene(): JSX.Element {
}
buttons={
- !shouldShowProductIntroduction ? (
- ,
+ onClick: () => router.actions.push(urls.dataWarehouseSettings()),
+ 'data-attr': 'saved-insights-new-insight-dropdown',
+ }}
+ data-attr="new-data-warehouse-easy-link"
+ key={'new-data-warehouse-easy-link'}
+ onClick={toggleSourceModal}
>
- New Table
-
+ Link Source
+
) : undefined
}
caption={
@@ -59,13 +72,14 @@ export function DataWarehouseExternalScene(): JSX.Element {
description={
'Bring your production database, revenue data, CRM contacts or any other data into PostHog.'
}
- action={() => router.actions.push(urls.dataWarehouseTable('new'))}
+ action={() => toggleSourceModal()}
isEmpty={shouldShowEmptyState}
docsURL="https://posthog.com/docs/data/data-warehouse"
productKey={ProductKey.DATA_WAREHOUSE}
/>
)}
{!shouldShowEmptyState && }
+
)
}
diff --git a/frontend/src/scenes/data-warehouse/external/SourceModal.tsx b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx
new file mode 100644
index 0000000000000..0c32f1c788958
--- /dev/null
+++ b/frontend/src/scenes/data-warehouse/external/SourceModal.tsx
@@ -0,0 +1,129 @@
+import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps } from '@posthog/lemon-ui'
+import { Form } from 'kea-forms'
+import { ConnectorConfigType, sourceModalLogic } from './sourceModalLogic'
+import { useActions, useValues } from 'kea'
+import { DatawarehouseTableForm } from '../DataWarehouseTableForm'
+import { Field } from 'lib/forms/Field'
+import stripeLogo from 'public/stripe-logo.svg'
+
+interface SourceModalProps extends LemonModalProps {}
+
+export default function SourceModal(props: SourceModalProps): JSX.Element {
+ const { tableLoading, isExternalDataSourceSubmitting, selectedConnector, isManualLinkFormVisible, connectors } =
+ useValues(sourceModalLogic)
+ const { selectConnector, toggleManualLinkFormVisible, resetExternalDataSource, resetTable } =
+ useActions(sourceModalLogic)
+
+ const MenuButton = (config: ConnectorConfigType): JSX.Element => {
+ const onClick = (): void => {
+ selectConnector(config)
+ }
+
+ return (
+
+
+
+ )
+ }
+
+ const onClear = (): void => {
+ selectConnector(null)
+ toggleManualLinkFormVisible(false)
+ resetExternalDataSource()
+ resetTable()
+ }
+
+ const onManualLinkClick = (): void => {
+ toggleManualLinkFormVisible(true)
+ }
+
+ const formToShow = (): JSX.Element => {
+ if (selectedConnector) {
+ return (
+
+ )
+ }
+
+ if (isManualLinkFormVisible) {
+ return (
+
+
+
+
+
+ Back
+
+
+ Link
+
+
+ >
+ }
+ />
+
+ )
+ }
+
+ return (
+
+ {connectors.map((config, index) => (
+
+ ))}
+
+ Manual Link
+
+
+ )
+ }
+
+ return (
+ onClear()}
+ title="Data Sources"
+ description={selectedConnector ? selectedConnector.caption : null}
+ >
+ {formToShow()}
+
+ )
+}
diff --git a/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx b/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx
index cd32de1ca8981..0706fa65b707e 100644
--- a/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx
+++ b/frontend/src/scenes/data-warehouse/external/dataWarehouseSceneLogic.tsx
@@ -1,4 +1,4 @@
-import { afterMount, connect, kea, path, selectors } from 'kea'
+import { actions, afterMount, connect, kea, path, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import api, { PaginatedResponse } from 'lib/api'
import { DataWarehouseTable, ProductKey } from '~/types'
@@ -12,6 +12,17 @@ export const dataWarehouseSceneLogic = kea([
connect(() => ({
values: [userLogic, ['user']],
})),
+ actions({
+ toggleSourceModal: true,
+ }),
+ reducers({
+ isSourceModalOpen: [
+ false,
+ {
+ toggleSourceModal: (state) => !state,
+ },
+ ],
+ }),
loaders({
dataWarehouse: [
null as PaginatedResponse | null,
diff --git a/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts
new file mode 100644
index 0000000000000..a902950180adc
--- /dev/null
+++ b/frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts
@@ -0,0 +1,107 @@
+import { actions, connect, kea, path, reducers, selectors, listeners } from 'kea'
+
+import type { sourceModalLogicType } from './sourceModalLogicType'
+import { forms } from 'kea-forms'
+import { ExternalDataStripeSourceCreatePayload } from '~/types'
+import api from 'lib/api'
+import { lemonToast } from '@posthog/lemon-ui'
+import { dataWarehouseTableLogic } from '../dataWarehouseTableLogic'
+import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic'
+import { router } from 'kea-router'
+import { urls } from 'scenes/urls'
+import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic'
+
+export interface ConnectorConfigType {
+ name: string
+ fields: string[]
+ caption: string
+ disabledReason: string | null
+}
+
+// TODO: add icon
+export const CONNECTORS: ConnectorConfigType[] = [
+ {
+ name: 'Stripe',
+ fields: ['accound_id', 'client_secret'],
+ caption: 'Enter your Stripe credentials to link your Stripe to PostHog',
+ disabledReason: null,
+ },
+]
+
+export const sourceModalLogic = kea([
+ path(['scenes', 'data-warehouse', 'external', 'sourceModalLogic']),
+ actions({
+ selectConnector: (connector: ConnectorConfigType | null) => ({ connector }),
+ toggleManualLinkFormVisible: (visible: boolean) => ({ visible }),
+ }),
+ connect({
+ values: [dataWarehouseTableLogic, ['tableLoading'], dataWarehouseSettingsLogic, ['dataWarehouseSources']],
+ actions: [
+ dataWarehouseSceneLogic,
+ ['toggleSourceModal'],
+ dataWarehouseTableLogic,
+ ['resetTable'],
+ dataWarehouseSettingsLogic,
+ ['loadSources'],
+ ],
+ }),
+ reducers({
+ selectedConnector: [
+ null as ConnectorConfigType | null,
+ {
+ selectConnector: (_, { connector }) => connector,
+ },
+ ],
+ isManualLinkFormVisible: [
+ false,
+ {
+ toggleManualLinkFormVisible: (_, { visible }) => visible,
+ },
+ ],
+ }),
+ selectors({
+ showFooter: [
+ (s) => [s.selectedConnector, s.isManualLinkFormVisible],
+ (selectedConnector, isManualLinkFormVisible) => selectedConnector || isManualLinkFormVisible,
+ ],
+ connectors: [
+ (s) => [s.dataWarehouseSources],
+ (sources) => {
+ return CONNECTORS.map((connector) => ({
+ ...connector,
+ disabledReason:
+ sources && sources.results.find((source) => source.source_type === connector.name)
+ ? 'Already linked'
+ : null,
+ }))
+ },
+ ],
+ }),
+ forms(() => ({
+ externalDataSource: {
+ defaults: { account_id: '', client_secret: '' } as ExternalDataStripeSourceCreatePayload,
+ errors: ({ account_id, client_secret }) => {
+ return {
+ account_id: !account_id && 'Please enter an account id.',
+ client_secret: !client_secret && 'Please enter a client secret.',
+ }
+ },
+ submit: async (payload: ExternalDataStripeSourceCreatePayload) => {
+ const newResource = await api.externalDataSources.create(payload)
+ return newResource
+ },
+ },
+ })),
+ listeners(({ actions }) => ({
+ submitExternalDataSourceSuccess: () => {
+ lemonToast.success('New Data Resource Created')
+ actions.toggleSourceModal()
+ actions.resetExternalDataSource()
+ actions.loadSources()
+ router.actions.push(urls.dataWarehouseSettings())
+ },
+ submitExternalDataSourceFailure: () => {
+ lemonToast.error('Error creating new Data Resource. Check that provided credentials are valid.')
+ },
+ })),
+])
diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx
new file mode 100644
index 0000000000000..94f3204e41fe7
--- /dev/null
+++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx
@@ -0,0 +1,85 @@
+import { LemonButton, LemonTable, LemonTag } from '@posthog/lemon-ui'
+import { PageHeader } from 'lib/components/PageHeader'
+import { SceneExport } from 'scenes/sceneTypes'
+import { dataWarehouseSettingsLogic } from './dataWarehouseSettingsLogic'
+import { useActions, useValues } from 'kea'
+import { dataWarehouseSceneLogic } from '../external/dataWarehouseSceneLogic'
+import SourceModal from '../external/SourceModal'
+import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
+import { FEATURE_FLAGS } from 'lib/constants'
+
+export const scene: SceneExport = {
+ component: DataWarehouseSettingsScene,
+ logic: dataWarehouseSettingsLogic,
+}
+
+const StatusTagSetting = {
+ running: 'default',
+ succeeded: 'primary',
+ error: 'danger',
+}
+
+export function DataWarehouseSettingsScene(): JSX.Element {
+ const { dataWarehouseSources, dataWarehouseSourcesLoading } = useValues(dataWarehouseSettingsLogic)
+ const { toggleSourceModal } = useActions(dataWarehouseSceneLogic)
+ const { isSourceModalOpen } = useValues(dataWarehouseSceneLogic)
+ const { featureFlags } = useValues(featureFlagLogic)
+
+ return (
+
+
+ Data Warehouse
+
+ Beta
+
+
+ }
+ buttons={
+ featureFlags[FEATURE_FLAGS.DATA_WAREHOUSE_EXTERNAL_LINK] ? (
+
+ Link Source
+
+ ) : undefined
+ }
+ caption={
+
+ Linked data sources will appear here. Data sources can take a while to sync depending on the
+ size of the source.
+
+ }
+ />
+ {source.status}
+ )
+ },
+ },
+ ]}
+ />
+
+
+ )
+}
diff --git a/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts
new file mode 100644
index 0000000000000..5be2cb17dfbf5
--- /dev/null
+++ b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts
@@ -0,0 +1,41 @@
+import { afterMount, kea, path, selectors } from 'kea'
+
+import type { dataWarehouseSettingsLogicType } from './dataWarehouseSettingsLogicType'
+import { loaders } from 'kea-loaders'
+import api, { PaginatedResponse } from 'lib/api'
+import { ExternalDataStripeSource, Breadcrumb } from '~/types'
+import { urls } from 'scenes/urls'
+
+export interface DataWarehouseSource {}
+
+export const dataWarehouseSettingsLogic = kea([
+ path(['scenes', 'data-warehouse', 'settings', 'dataWarehouseSettingsLogic']),
+ loaders({
+ dataWarehouseSources: [
+ null as PaginatedResponse | null,
+ {
+ loadSources: async () => {
+ return api.externalDataSources.list()
+ },
+ },
+ ],
+ }),
+ selectors({
+ breadcrumbs: [
+ () => [],
+ (): Breadcrumb[] => [
+ {
+ name: `Data Warehouse`,
+ path: urls.dataWarehouseExternal(),
+ },
+ {
+ name: 'Data Warehouse Settings',
+ path: urls.dataWarehouseSettings(),
+ },
+ ],
+ ],
+ }),
+ afterMount(({ actions }) => {
+ actions.loadSources()
+ }),
+])
diff --git a/frontend/src/scenes/sceneLogic.ts b/frontend/src/scenes/sceneLogic.ts
index 6663d51bc6ed5..db2831a02c992 100644
--- a/frontend/src/scenes/sceneLogic.ts
+++ b/frontend/src/scenes/sceneLogic.ts
@@ -35,10 +35,10 @@ const sceneNavAlias: Partial> = {
[Scene.EarlyAccessFeature]: Scene.EarlyAccessFeatures,
[Scene.Survey]: Scene.Surveys,
[Scene.SurveyTemplates]: Scene.Surveys,
- [Scene.DataWarehouseTable]: Scene.DataWarehouse,
[Scene.DataWarehousePosthog]: Scene.DataWarehouse,
[Scene.DataWarehouseExternal]: Scene.DataWarehouse,
[Scene.DataWarehouseSavedQueries]: Scene.DataWarehouse,
+ [Scene.DataWarehouseSettings]: Scene.DataWarehouse,
[Scene.AppMetrics]: Scene.Apps,
[Scene.ReplaySingle]: Scene.Replay,
[Scene.ReplayPlaylist]: Scene.ReplayPlaylist,
diff --git a/frontend/src/scenes/sceneTypes.ts b/frontend/src/scenes/sceneTypes.ts
index 3ff7072ca8404..dc17598a8ee81 100644
--- a/frontend/src/scenes/sceneTypes.ts
+++ b/frontend/src/scenes/sceneTypes.ts
@@ -48,7 +48,7 @@ export enum Scene {
DataWarehousePosthog = 'DataWarehousePosthog',
DataWarehouseExternal = 'DataWarehouseExternal',
DataWarehouseSavedQueries = 'DataWarehouseSavedQueries',
- DataWarehouseTable = 'DataWarehouseTable',
+ DataWarehouseSettings = 'DataWarehouseSettings',
OrganizationSettings = 'OrganizationSettings',
OrganizationCreateFirst = 'OrganizationCreate',
ProjectHomepage = 'ProjectHomepage',
diff --git a/frontend/src/scenes/scenes.ts b/frontend/src/scenes/scenes.ts
index 78153fac0cf1e..3d3b3baa214db 100644
--- a/frontend/src/scenes/scenes.ts
+++ b/frontend/src/scenes/scenes.ts
@@ -187,9 +187,9 @@ export const sceneConfigurations: Partial> = {
projectBased: true,
name: 'Data Warehouse',
},
- [Scene.DataWarehouseTable]: {
+ [Scene.DataWarehouseSettings]: {
projectBased: true,
- name: 'Data Warehouse Table',
+ name: 'Data Warehouse Settings',
},
[Scene.EarlyAccessFeatures]: {
projectBased: true,
@@ -455,10 +455,10 @@ export const routes: Record = {
[urls.survey(':id')]: Scene.Survey,
[urls.surveyTemplates()]: Scene.SurveyTemplates,
[urls.dataWarehouse()]: Scene.DataWarehouse,
- [urls.dataWarehouseTable(':id')]: Scene.DataWarehouseTable,
[urls.dataWarehousePosthog()]: Scene.DataWarehousePosthog,
[urls.dataWarehouseExternal()]: Scene.DataWarehouseExternal,
[urls.dataWarehouseSavedQueries()]: Scene.DataWarehouseSavedQueries,
+ [urls.dataWarehouseSettings()]: Scene.DataWarehouseSettings,
[urls.featureFlags()]: Scene.FeatureFlags,
[urls.featureFlag(':id')]: Scene.FeatureFlag,
[urls.annotations()]: Scene.Annotations,
diff --git a/frontend/src/scenes/urls.ts b/frontend/src/scenes/urls.ts
index a0253f81ffc21..3e6823c7fa1b6 100644
--- a/frontend/src/scenes/urls.ts
+++ b/frontend/src/scenes/urls.ts
@@ -111,11 +111,11 @@ export const urls = {
surveys: (): string => '/surveys',
survey: (id: ':id' | 'new' | string): string => `/surveys/${id}`,
surveyTemplates: (): string => '/survey_templates',
- dataWarehouse: (): string => '/warehouse',
- dataWarehouseTable: (id: ':id' | 'new' | string): string => `/warehouse/${id}`,
+ dataWarehouse: (): string => '/data-warehouse',
dataWarehousePosthog: (): string => '/data-warehouse/posthog',
dataWarehouseExternal: (): string => '/data-warehouse/external',
dataWarehouseSavedQueries: (): string => '/data-warehouse/views',
+ dataWarehouseSettings: (): string => '/data-warehouse/settings',
annotations: (): string => '/annotations',
annotation: (id: AnnotationType['id'] | ':id'): string => `/annotations/${id}`,
projectApps: (tab?: PluginTab): string => `/project/apps${tab ? `?tab=${tab}` : ''}`,
diff --git a/frontend/src/types.ts b/frontend/src/types.ts
index fe163c0105b5b..c515f2e22af3d 100644
--- a/frontend/src/types.ts
+++ b/frontend/src/types.ts
@@ -3141,6 +3141,19 @@ export interface DataWarehouseViewLink {
from_join_key?: string
}
+export interface ExternalDataStripeSourceCreatePayload {
+ account_id: string
+ client_secret: string
+}
+
+export interface ExternalDataStripeSource {
+ id: string
+ source_id: string
+ connection_id: string
+ status: string
+ source_type: string
+}
+
export type BatchExportDestinationS3 = {
type: 'S3'
config: {
diff --git a/latest_migrations.manifest b/latest_migrations.manifest
index e74eafecde55d..a5b2111866ab5 100644
--- a/latest_migrations.manifest
+++ b/latest_migrations.manifest
@@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
-posthog: 0357_add_redshift_batch_export_destination
+posthog: 0358_externaldatasource
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py
index 8003f2139ce2c..2756855a074f9 100644
--- a/posthog/api/__init__.py
+++ b/posthog/api/__init__.py
@@ -3,7 +3,7 @@
from posthog.api.routing import DefaultRouterPlusPlus
from posthog.batch_exports import http as batch_exports
from posthog.settings import EE_AVAILABLE
-from posthog.warehouse.api import saved_query, table, view_link
+from posthog.warehouse.api import external_data_source, saved_query, table, view_link
from ..session_recordings.session_recording_api import SessionRecordingViewSet
from . import (
activity_log,
@@ -210,6 +210,14 @@ def api_not_found(request):
projects_router.register(r"tags", tagged_item.TaggedItemViewSet, "project_tags", ["team_id"])
projects_router.register(r"query", query.QueryViewSet, "project_query", ["team_id"])
+# External data resources
+projects_router.register(
+ r"external_data_sources",
+ external_data_source.ExternalDataSourceViewSet,
+ "project_external_data_sources",
+ ["team_id"],
+)
+
# General endpoints (shared across CH & PG)
router.register(r"login", authentication.LoginViewSet, "login")
router.register(r"login/token", authentication.TwoFactorViewSet)
diff --git a/posthog/celery.py b/posthog/celery.py
index 1eb5bb40db888..a7b62848bfab3 100644
--- a/posthog/celery.py
+++ b/posthog/celery.py
@@ -327,6 +327,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs):
name="delete expired exported assets",
)
+ sender.add_periodic_task(
+ crontab(minute="*/10"),
+ sync_datawarehouse_sources.s(),
+ name="sync datawarehouse sources that have settled in s3 bucket",
+ )
+
# Set up clickhouse query instrumentation
@task_prerun.connect
@@ -1081,3 +1087,13 @@ def ee_persist_finished_recordings():
pass
else:
persist_finished_recordings()
+
+
+@app.task(ignore_result=True)
+def sync_datawarehouse_sources():
+ try:
+ from posthog.warehouse.sync_resource import sync_resources
+ except ImportError:
+ pass
+ else:
+ sync_resources()
diff --git a/posthog/migrations/0358_externaldatasource.py b/posthog/migrations/0358_externaldatasource.py
new file mode 100644
index 0000000000000..d92285093803f
--- /dev/null
+++ b/posthog/migrations/0358_externaldatasource.py
@@ -0,0 +1,42 @@
+# Generated by Django 3.2.19 on 2023-09-19 14:19
+
+from django.conf import settings
+from django.db import migrations, models
+import django.db.models.deletion
+import posthog.models.utils
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("posthog", "0357_add_redshift_batch_export_destination"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="ExternalDataSource",
+ fields=[
+ ("created_at", models.DateTimeField(auto_now_add=True)),
+ (
+ "id",
+ models.UUIDField(
+ default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
+ ),
+ ),
+ ("source_id", models.CharField(max_length=400)),
+ ("connection_id", models.CharField(max_length=400)),
+ (
+ "created_by",
+ models.ForeignKey(
+ blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL
+ ),
+ ),
+ ("status", models.CharField(max_length=400)),
+ ("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")),
+ ("source_type", models.CharField(choices=[("Stripe", "Stripe")], max_length=128)),
+ ("are_tables_created", models.BooleanField(default=False)),
+ ],
+ options={
+ "abstract": False,
+ },
+ ),
+ ]
diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py
index 5f915f3c1d6f5..58010175ea580 100644
--- a/posthog/settings/__init__.py
+++ b/posthog/settings/__init__.py
@@ -38,6 +38,7 @@
from posthog.settings.object_storage import *
from posthog.settings.temporal import *
from posthog.settings.web import *
+from posthog.settings.airbyte import *
from posthog.settings.utils import get_from_env, str_to_bool
diff --git a/posthog/settings/airbyte.py b/posthog/settings/airbyte.py
new file mode 100644
index 0000000000000..c337561076d12
--- /dev/null
+++ b/posthog/settings/airbyte.py
@@ -0,0 +1,8 @@
+import os
+
+AIRBYTE_API_KEY = os.getenv("AIRBYTE_API_KEY", None)
+AIRBYTE_WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID", None)
+AIRBYTE_BUCKET_REGION = os.getenv("AIRBYTE_BUCKET_REGION", None)
+AIRBYTE_BUCKET_KEY = os.getenv("AIRBYTE_BUCKET_KEY", None)
+AIRBYTE_BUCKET_SECRET = os.getenv("AIRBYTE_BUCKET_SECRET", None)
+AIRBYTE_BUCKET_URL = os.getenv("AIRBYTE_BUCKET_URL", None)
diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py
new file mode 100644
index 0000000000000..c098e88fb5a9c
--- /dev/null
+++ b/posthog/warehouse/api/external_data_source.py
@@ -0,0 +1,82 @@
+from rest_framework import status
+from rest_framework.request import Request
+from rest_framework.response import Response
+from posthog.permissions import OrganizationMemberPermissions
+from rest_framework.exceptions import NotAuthenticated
+from rest_framework.permissions import IsAuthenticated
+from rest_framework import filters, serializers, viewsets
+from posthog.warehouse.models import ExternalDataSource
+from posthog.warehouse.external_data_source.source import StripeSourcePayload, create_stripe_source, delete_source
+from posthog.warehouse.external_data_source.connection import create_connection, start_sync
+from posthog.warehouse.external_data_source.destination import create_destination, delete_destination
+from posthog.api.routing import StructuredViewSetMixin
+
+from posthog.models import User
+from typing import Any
+
+
+class ExternalDataSourceSerializers(serializers.ModelSerializer):
+ account_id = serializers.CharField(write_only=True)
+ client_secret = serializers.CharField(write_only=True)
+
+ class Meta:
+ model = ExternalDataSource
+ fields = ["id", "source_id", "created_at", "created_by", "status", "client_secret", "account_id", "source_type"]
+ read_only_fields = ["id", "source_id", "created_by", "created_at", "status", "source_type"]
+
+
+class ExternalDataSourceViewSet(StructuredViewSetMixin, viewsets.ModelViewSet):
+ """
+ Create, Read, Update and Delete External data Sources.
+ """
+
+ queryset = ExternalDataSource.objects.all()
+ serializer_class = ExternalDataSourceSerializers
+ permission_classes = [IsAuthenticated, OrganizationMemberPermissions]
+ filter_backends = [filters.SearchFilter]
+ search_fields = ["source_id"]
+ ordering = "-created_at"
+
+ def get_queryset(self):
+ if not isinstance(self.request.user, User) or self.request.user.current_team is None:
+ raise NotAuthenticated()
+
+ if self.action == "list":
+ return self.queryset.filter(team_id=self.team_id).prefetch_related("created_by").order_by(self.ordering)
+
+ return self.queryset.filter(team_id=self.team_id).prefetch_related("created_by").order_by(self.ordering)
+
+ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
+ account_id = request.data["account_id"]
+ client_secret = request.data["client_secret"]
+
+ stripe_payload = StripeSourcePayload(
+ account_id=account_id,
+ client_secret=client_secret,
+ )
+ new_source = create_stripe_source(stripe_payload)
+
+ try:
+ new_destination = create_destination(self.team_id)
+ except Exception as e:
+ delete_source(new_source.source_id)
+ raise e
+
+ try:
+ new_connection = create_connection(new_source.source_id, new_destination.destination_id)
+ except Exception as e:
+ delete_source(new_source.source_id)
+ delete_destination(new_destination.destination_id)
+ raise e
+
+ ExternalDataSource.objects.create(
+ source_id=new_source.source_id,
+ connection_id=new_connection.connection_id,
+ team=self.team,
+ status="running",
+ source_type="Stripe",
+ )
+
+ start_sync(new_connection.connection_id)
+
+ return Response(status=status.HTTP_201_CREATED, data={"source_id": new_source.source_id})
diff --git a/posthog/warehouse/external_data_source/connection.py b/posthog/warehouse/external_data_source/connection.py
new file mode 100644
index 0000000000000..b0ab472430ccc
--- /dev/null
+++ b/posthog/warehouse/external_data_source/connection.py
@@ -0,0 +1,106 @@
+import requests
+from django.conf import settings
+from pydantic import BaseModel
+from typing import Dict
+
+AIRBYTE_CONNECTION_URL = "https://api.airbyte.com/v1/connections"
+AIRBYTE_JOBS_URL = "https://api.airbyte.com/v1/jobs"
+
+
+class ExternalDataConnection(BaseModel):
+ connection_id: str
+ source_id: str
+ destination_id: str
+ name: str
+ workspace_id: str
+
+
+def create_connection(source_id: str, destination_id: str) -> ExternalDataConnection:
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
+
+ headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
+
+ payload = {
+ "schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
+ "namespaceFormat": None,
+ "sourceId": source_id,
+ "destinationId": destination_id,
+ }
+
+ response = requests.post(AIRBYTE_CONNECTION_URL, json=payload, headers=headers)
+ response_payload = response.json()
+
+ if not response.ok:
+ raise ValueError(response_payload["message"])
+
+ update_connection_stream(response_payload["connectionId"], headers)
+
+ return ExternalDataConnection(
+ source_id=response_payload["sourceId"],
+ name=response_payload["name"],
+ connection_id=response_payload["connectionId"],
+ workspace_id=response_payload["workspaceId"],
+ destination_id=response_payload["destinationId"],
+ )
+
+
+def update_connection_stream(connection_id: str, headers: Dict):
+ connection_id_url = f"{AIRBYTE_CONNECTION_URL}/{connection_id}"
+
+ # TODO: hardcoded to stripe stream right now
+ payload = {
+ "configurations": {"streams": [{"name": "customers", "syncMode": "full_refresh_overwrite"}]},
+ "schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
+ "namespaceFormat": None,
+ }
+
+ response = requests.patch(connection_id_url, json=payload, headers=headers)
+ response_payload = response.json()
+
+ if not response.ok:
+ raise ValueError(response_payload["message"])
+
+
+def delete_connection(connection_id: str) -> None:
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to delete a connection.")
+
+ headers = {"Authorization": f"Bearer {token}"}
+ response = requests.delete(AIRBYTE_CONNECTION_URL + "/" + connection_id, headers=headers)
+
+ if not response.ok:
+ raise ValueError(response.json()["message"])
+
+
+# Fire and forget
+def start_sync(connection_id: str):
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to start sync.")
+
+ headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
+ payload = {"jobType": "sync", "connectionId": connection_id}
+
+ requests.post(AIRBYTE_JOBS_URL, json=payload, headers=headers)
+
+
+def retrieve_sync(connection_id: str):
+ token = settings.AIRBYTE_API_KEY
+ headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
+ params = {"connectionId": connection_id, "limit": 1}
+ response = requests.get(AIRBYTE_JOBS_URL, params=params, headers=headers)
+ response_payload = response.json()
+
+ if not response.ok:
+ raise ValueError(response_payload["message"])
+
+ data = response_payload.get("data", [])
+ if not data:
+ return None
+
+ latest_job = response_payload["data"][0]
+
+ return latest_job
diff --git a/posthog/warehouse/external_data_source/destination.py b/posthog/warehouse/external_data_source/destination.py
new file mode 100644
index 0000000000000..46b680ea42f76
--- /dev/null
+++ b/posthog/warehouse/external_data_source/destination.py
@@ -0,0 +1,52 @@
+import requests
+from django.conf import settings
+from pydantic import BaseModel
+
+AIRBYTE_DESTINATION_URL = "https://api.airbyte.com/v1/destinations"
+
+
+class ExternalDataDestination(BaseModel):
+ destination_id: str
+
+
+def create_destination(team_id: int) -> ExternalDataDestination:
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
+
+ payload = {
+ "configuration": {
+ "format": {"format_type": "Parquet", "compression_codec": "UNCOMPRESSED"},
+ "destinationType": "s3",
+ "s3_bucket_region": settings.AIRBYTE_BUCKET_REGION,
+ "access_key_id": settings.AIRBYTE_BUCKET_KEY,
+ "secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
+ "s3_bucket_name": "databeach-hackathon",
+ "s3_bucket_path": f"airbyte/{team_id}",
+ },
+ "name": f"S3/{team_id}",
+ "workspaceId": settings.AIRBYTE_WORKSPACE_ID,
+ }
+ headers = {"accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {token}"}
+
+ response = requests.post(AIRBYTE_DESTINATION_URL, json=payload, headers=headers)
+ response_payload = response.json()
+
+ if not response.ok:
+ raise ValueError(response_payload["message"])
+
+ return ExternalDataDestination(
+ destination_id=response_payload["destinationId"],
+ )
+
+
+def delete_destination(destination_id: str) -> None:
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to delete a destiantion.")
+ headers = {"authorization": f"Bearer {token}"}
+
+ response = requests.delete(AIRBYTE_DESTINATION_URL + "/" + destination_id, headers=headers)
+
+ if not response.ok:
+ raise ValueError(response.json()["message"])
diff --git a/posthog/warehouse/external_data_source/source.py b/posthog/warehouse/external_data_source/source.py
new file mode 100644
index 0000000000000..c142ce0c2a861
--- /dev/null
+++ b/posthog/warehouse/external_data_source/source.py
@@ -0,0 +1,106 @@
+import requests
+from django.conf import settings
+from posthog.models.utils import UUIDT
+from pydantic import BaseModel, field_validator
+from typing import Dict, Optional
+import datetime as dt
+
+AIRBYTE_SOURCE_URL = "https://api.airbyte.com/v1/sources"
+
+
+class StripeSourcePayload(BaseModel):
+ account_id: str
+ client_secret: str
+ start_date: Optional[dt.datetime] = None
+ lookback_window_days: Optional[int] = None
+ slice_range: Optional[int] = None
+
+ @field_validator("account_id")
+ @classmethod
+ def account_id_is_valid_uuid(cls, v: str) -> str:
+ try:
+ UUIDT.is_valid_uuid(v)
+ except ValueError:
+ raise ValueError("account_id must be a valid UUID.")
+ return v
+
+ @field_validator("start_date")
+ @classmethod
+ def valid_iso_start_date(cls, v: Optional[str]) -> Optional[str]:
+ from posthog.batch_exports.http import validate_date_input
+
+ if not v:
+ return v
+
+ try:
+ validate_date_input(v)
+ except ValueError:
+ raise ValueError("start_date must be a valid ISO date string.")
+ return v
+
+
+class ExternalDataSource(BaseModel):
+ source_id: str
+ name: str
+ source_type: str
+ workspace_id: str
+
+
+def create_stripe_source(payload: StripeSourcePayload) -> ExternalDataSource:
+ workspace_id = settings.AIRBYTE_WORKSPACE_ID
+ if not workspace_id:
+ raise ValueError("AIRBYTE_WORKSPACE_ID must be set in order to create a source.")
+
+ optional_config = {}
+ if payload.start_date:
+ optional_config["start_date"] = payload.start_date.isoformat()
+
+ if payload.lookback_window_days:
+ optional_config["lookback_window_days"] = payload.lookback_window_days
+
+ if payload.slice_range:
+ optional_config["slice_range"] = payload.slice_range
+
+ payload = {
+ "configuration": {
+ "sourceType": "stripe",
+ "account_id": payload.account_id,
+ "client_secret": payload.client_secret,
+ **optional_config,
+ },
+ "name": "stripe source",
+ "workspaceId": workspace_id,
+ }
+ return _create_source(payload)
+
+
+def _create_source(payload: Dict) -> ExternalDataSource:
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.")
+
+ headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"}
+
+ response = requests.post(AIRBYTE_SOURCE_URL, json=payload, headers=headers)
+ response_payload = response.json()
+ if not response.ok:
+ raise ValueError(response_payload["message"])
+
+ return ExternalDataSource(
+ source_id=response_payload["sourceId"],
+ name=response_payload["name"],
+ source_type=response_payload["sourceType"],
+ workspace_id=response_payload["workspaceId"],
+ )
+
+
+def delete_source(source_id):
+ token = settings.AIRBYTE_API_KEY
+ if not token:
+ raise ValueError("AIRBYTE_API_KEY must be set in order to delete a source.")
+ headers = {"authorization": f"Bearer {token}"}
+
+ response = requests.delete(AIRBYTE_SOURCE_URL + "/" + source_id, headers=headers)
+
+ if not response.ok:
+ raise ValueError(response.json()["message"])
diff --git a/posthog/warehouse/models/__init__.py b/posthog/warehouse/models/__init__.py
index 0af35ffb15a06..c3286f1b7f6bb 100644
--- a/posthog/warehouse/models/__init__.py
+++ b/posthog/warehouse/models/__init__.py
@@ -2,3 +2,4 @@
from .credential import *
from .datawarehouse_saved_query import *
from .view_link import *
+from .external_data_source import *
diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py
new file mode 100644
index 0000000000000..60171a7035cc2
--- /dev/null
+++ b/posthog/warehouse/models/external_data_source.py
@@ -0,0 +1,17 @@
+from posthog.models.utils import UUIDModel, CreatedMetaFields, sane_repr
+from django.db import models
+from posthog.models.team import Team
+
+
+class ExternalDataSource(CreatedMetaFields, UUIDModel):
+ class Type(models.TextChoices):
+ STRIPE = "Stripe", "Stripe"
+
+ source_id: models.CharField = models.CharField(max_length=400)
+ connection_id: models.CharField = models.CharField(max_length=400)
+ team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
+ status: models.CharField = models.CharField(max_length=400)
+ source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices)
+ are_tables_created: models.BooleanField = models.BooleanField(default=False)
+
+ __repr__ = sane_repr("source_id")
diff --git a/posthog/warehouse/sync_resource.py b/posthog/warehouse/sync_resource.py
new file mode 100644
index 0000000000000..7b9c20ce3deb4
--- /dev/null
+++ b/posthog/warehouse/sync_resource.py
@@ -0,0 +1,65 @@
+from posthog.warehouse.models.external_data_source import ExternalDataSource
+from posthog.warehouse.models import DataWarehouseCredential, DataWarehouseTable
+from posthog.warehouse.external_data_source.connection import retrieve_sync
+from posthog.celery import app
+
+from django.conf import settings
+import structlog
+
+logger = structlog.get_logger(__name__)
+
+
+def sync_resources():
+ resources = ExternalDataSource.objects.filter(are_tables_created=False, status__in=["running", "error"])
+
+ for resource in resources:
+ _sync_resource.delay(resource.pk)
+
+
+@app.task(ignore_result=True)
+def _sync_resource(resource_id):
+ resource = ExternalDataSource.objects.get(pk=resource_id)
+
+ try:
+ job = retrieve_sync(resource.connection_id)
+ except Exception as e:
+ logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
+ resource.status = "error"
+ resource.save()
+ return
+
+ if job is None:
+ logger.error(f"No jobs found for connection: {resource.connection_id}")
+ resource.status = "error"
+ resource.save()
+
+ if job["status"] == "succeeded":
+ resource = ExternalDataSource.objects.get(pk=resource_id)
+ credential, _ = DataWarehouseCredential.objects.get_or_create(
+ team_id=resource.team.pk,
+ access_key=settings.AIRBYTE_BUCKET_KEY,
+ access_secret=settings.AIRBYTE_BUCKET_SECRET,
+ )
+
+ data = {
+ "credential": credential,
+ "name": "stripe_customers",
+ "format": "Parquet",
+ "url_pattern": f"{settings.AIRBYTE_BUCKET_URL}/{resource.team.pk}/customers/*.parquet",
+ "team_id": resource.team.pk,
+ }
+
+ table = DataWarehouseTable(**data)
+ try:
+ table.columns = table.get_columns()
+ except Exception as e:
+ logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
+ else:
+ table.save()
+
+ resource.are_tables_created = True
+ resource.status = job["status"]
+ resource.save()
+ else:
+ resource.status = job["status"]
+ resource.save()