Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(data-warehouse): adding payload types #18702

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions frontend/public/postgres-logo.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ const api = {
async list(): Promise<PaginatedResponse<ExternalDataSource>> {
return await new ApiRequest().externalDataSources().get()
},
async create(data: ExternalDataSourceCreatePayload): Promise<ExternalDataSourceCreatePayload> {
async create(data: ExternalDataSourceCreatePayload): Promise<ExternalDataSource> {
return await new ApiRequest().externalDataSources().create({ data })
},
async delete(sourceId: ExternalDataSource['id']): Promise<void> {
Expand Down
93 changes: 48 additions & 45 deletions frontend/src/scenes/data-warehouse/external/SourceModal.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { LemonButton, LemonDivider, LemonInput, LemonModal, LemonModalProps } from '@posthog/lemon-ui'
import { Form } from 'kea-forms'
import { ConnectorConfigType, sourceModalLogic } from './sourceModalLogic'
import { ConnectorConfigType, FORM_PAYLOAD_TYPES, FormPayloadType, sourceModalLogic } from './sourceModalLogic'
import { useActions, useValues } from 'kea'
import { DatawarehouseTableForm } from '../new_table/DataWarehouseTableForm'
import { Field } from 'lib/forms/Field'
import stripeLogo from 'public/stripe-logo.svg'

interface SourceModalProps extends LemonModalProps {}

Expand All @@ -21,7 +20,7 @@ export default function SourceModal(props: SourceModalProps): JSX.Element {

return (
<LemonButton onClick={onClick} className="w-100" center type="secondary">
<img src={stripeLogo} alt={`stripe logo`} height={50} />
<img src={config.icon} alt={`stripe logo`} height={50} />
</LemonButton>
)
}
Expand All @@ -37,38 +36,40 @@ export default function SourceModal(props: SourceModalProps): JSX.Element {
toggleManualLinkFormVisible(true)
}

const formToShow = (): JSX.Element => {
const formPayloadTypeToField = (formPayloadType: FormPayloadType): JSX.Element => {
return (
<Field name={formPayloadType.name} label={formPayloadType.label}>
<LemonInput className="ph-ignore-input" autoFocus data-attr={formPayloadType.name + '_input'} />
</Field>
)
}

const buildPayloadTypeForm = (payloadType: string): JSX.Element => {
return (
<Form logic={sourceModalLogic} formKey={'externalDataSource'} className="space-y-4" enableFormOnSubmit>
{FORM_PAYLOAD_TYPES[payloadType].map(formPayloadTypeToField)}
<LemonDivider className="mt-4" />
<div className="mt-2 flex flex-row justify-end gap-2">
<LemonButton type="secondary" center data-attr="source-modal-back-button" onClick={onClear}>
Back
</LemonButton>
<LemonButton
type="primary"
center
htmlType="submit"
data-attr="source-link"
loading={isExternalDataSourceSubmitting}
>
Link
</LemonButton>
</div>
</Form>
)
}

const formToShow = (selectedConnector: ConnectorConfigType): JSX.Element => {
if (selectedConnector) {
return (
<Form logic={sourceModalLogic} formKey={'externalDataSource'} className="space-y-4" enableFormOnSubmit>
<Field name="account_id" label="Account Id">
<LemonInput className="ph-ignore-input" autoFocus data-attr="account-id" placeholder="acct_" />
</Field>
<Field name="client_secret" label="Client Secret">
<LemonInput
className="ph-ignore-input"
autoFocus
data-attr="client-secret"
placeholder="sklive"
/>
</Field>
<LemonDivider className="mt-4" />
<div className="mt-2 flex flex-row justify-end gap-2">
<LemonButton type="secondary" center data-attr="source-modal-back-button" onClick={onClear}>
Back
</LemonButton>
<LemonButton
type="primary"
center
htmlType="submit"
data-attr="source-link"
loading={isExternalDataSourceSubmitting}
>
Link
</LemonButton>
</div>
</Form>
)
return buildPayloadTypeForm(selectedConnector.name)
}

if (isManualLinkFormVisible) {
Expand Down Expand Up @@ -104,16 +105,7 @@ export default function SourceModal(props: SourceModalProps): JSX.Element {
)
}

return (
<div className="flex flex-col gap-2">
{connectors.map((config, index) => (
<MenuButton key={config.name + '_' + index} {...config} />
))}
<LemonButton onClick={onManualLinkClick} className="w-100" center type="secondary">
Manual Link
</LemonButton>
</div>
)
return <></>
}

return (
Expand All @@ -123,7 +115,18 @@ export default function SourceModal(props: SourceModalProps): JSX.Element {
title="Data Sources"
description={selectedConnector ? selectedConnector.caption : null}
>
{formToShow()}
{selectedConnector ? (
formToShow(selectedConnector)
) : (
<div className="flex flex-col gap-2">
{connectors.map((config, index) => (
<MenuButton key={config.name + '_' + index} {...config} />
))}
<LemonButton onClick={onManualLinkClick} className="w-100" center type="secondary">
Manual Link
</LemonButton>
</div>
)}
</LemonModal>
)
}
75 changes: 68 additions & 7 deletions frontend/src/scenes/data-warehouse/external/sourceModalLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,82 @@ import { dataWarehouseSceneLogic } from './dataWarehouseSceneLogic'
import { router } from 'kea-router'
import { urls } from 'scenes/urls'
import { dataWarehouseSettingsLogic } from '../settings/dataWarehouseSettingsLogic'
import stripeLogo from 'public/stripe-logo.svg'
import postgresLogo from 'public/postgres-logo.svg'

export interface ConnectorConfigType {
name: string
fields: string[]
caption: string
disabledReason: string | null
icon: string
}

// TODO: add icon
export const CONNECTORS: ConnectorConfigType[] = [
{
name: 'Stripe',
fields: ['accound_id', 'client_secret'],
name: 'stripe',
caption: 'Enter your Stripe credentials to link your Stripe to PostHog',
disabledReason: null,
icon: stripeLogo,
},
{
name: 'postgres',
caption: 'Enter your Postgres credentials to link your Postgres database to PostHog',
disabledReason: null,
icon: postgresLogo,
},
]

type FormTypes = 'input' | 'select'

export interface FormPayloadType {
name: string
type: FormTypes
label: string
}

export const FORM_PAYLOAD_TYPES: Record<string, FormPayloadType[]> = {
stripe: [
{
name: 'account_id',
type: 'input',
label: 'Account Id',
},
{
name: 'client_secret',
type: 'input',
label: 'Client Secret',
},
],
postgres: [
{
name: 'host',
type: 'input',
label: 'Host',
},
{
name: 'port',
type: 'input',
label: 'Port',
},
{
name: 'database',
type: 'input',
label: 'Database',
},
{
name: 'username',
type: 'input',
label: 'Username',
},
{
name: 'password',
type: 'input',
label: 'Password',
},
],
}

export const sourceModalLogic = kea<sourceModalLogicType>([
path(['scenes', 'data-warehouse', 'external', 'sourceModalLogic']),
actions({
Expand Down Expand Up @@ -79,16 +137,19 @@ export const sourceModalLogic = kea<sourceModalLogicType>([
}),
forms(() => ({
externalDataSource: {
defaults: { account_id: '', client_secret: '' } as ExternalDataSourceCreatePayload,
defaults: { account_id: '', client_secret: '' },
errors: ({ account_id, client_secret }) => {
return {
account_id: !account_id && 'Please enter an account id.',
client_secret: !client_secret && 'Please enter a client secret.',
}
},
submit: async (payload: ExternalDataSourceCreatePayload) => {
const newResource = await api.externalDataSources.create(payload)
return newResource
submit: async (payload) => {
await api.externalDataSources.create({
payload,
payload_type: 'stripe',
} as ExternalDataSourceCreatePayload)
return payload
},
},
})),
Expand Down
5 changes: 2 additions & 3 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3232,10 +3232,9 @@ export interface DataWarehouseViewLink {
}

export interface ExternalDataSourceCreatePayload {
account_id: string
client_secret: string
payload_type: string
payload: Record<string, any>
}

export interface ExternalDataSource {
id: string
source_id: string
Expand Down
24 changes: 13 additions & 11 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from rest_framework import filters, serializers, viewsets
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.external_data_source.workspace import get_or_create_workspace
from posthog.warehouse.external_data_source.source import StripeSourcePayload, create_stripe_source, delete_source
from posthog.warehouse.external_data_source.source import create_source, delete_source
from posthog.warehouse.external_data_source.source_definitions import SOURCE_TYPE_MAPPING
from posthog.warehouse.external_data_source.connection import (
create_connection,
start_sync,
Expand Down Expand Up @@ -83,16 +84,17 @@ def get_queryset(self):
return self.queryset.filter(team_id=self.team_id).prefetch_related("created_by").order_by(self.ordering)

def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
account_id = request.data["account_id"]
client_secret = request.data["client_secret"]
payload = request.data["payload"]
payload_type = request.data["payload_type"]

workspace_id = get_or_create_workspace(self.team_id)
if payload_type not in SOURCE_TYPE_MAPPING.keys():
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"detail": f"Payload type {payload_type} is not supported."},
)

stripe_payload = StripeSourcePayload(
account_id=account_id,
client_secret=client_secret,
)
new_source = create_stripe_source(stripe_payload, workspace_id)
workspace_id = get_or_create_workspace(self.team_id)
new_source = create_source(payload_type, payload, workspace_id)

try:
new_destination = create_destination(self.team_id, workspace_id)
Expand All @@ -101,7 +103,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
raise e

try:
new_connection = create_connection(new_source.source_id, new_destination.destination_id)
new_connection = create_connection(payload_type, new_source.source_id, new_destination.destination_id)
except Exception as e:
delete_source(new_source.source_id)
delete_destination(new_destination.destination_id)
Expand All @@ -113,7 +115,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
destination_id=new_destination.destination_id,
team=self.team,
status="running",
source_type="Stripe",
source_type=payload_type,
)

start_sync(new_connection.connection_id)
Expand Down
16 changes: 11 additions & 5 deletions posthog/warehouse/external_data_source/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pydantic import BaseModel
from posthog.warehouse.external_data_source.client import send_request
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.external_data_source.source_definitions import SOURCE_TYPE_MAPPING
import structlog
from typing import List

Expand All @@ -19,17 +20,24 @@ class ExternalDataConnection(BaseModel):
workspace_id: str


def create_connection(source_id: str, destination_id: str) -> ExternalDataConnection:
def create_connection(source_type: str, source_id: str, destination_id: str) -> ExternalDataConnection:
default_streams_by_type = SOURCE_TYPE_MAPPING[source_type]["default_streams"]
payload = {
"schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
"namespaceFormat": None,
"sourceId": source_id,
"destinationId": destination_id,
"prefix": f"{source_type}_",
}

response = send_request(AIRBYTE_CONNECTION_URL, method="POST", payload=payload)
if default_streams_by_type:
payload["configurations"] = {
"streams": [
{"name": streamName, "syncMode": "full_refresh_overwrite"} for streamName in default_streams_by_type
]
}

update_connection_stream(response["connectionId"], ["customers"])
response = send_request(AIRBYTE_CONNECTION_URL, method="POST", payload=payload)

return ExternalDataConnection(
source_id=response["sourceId"],
Expand Down Expand Up @@ -70,14 +78,12 @@ def update_connection_status_by_id(connection_id: str, status: str):
def update_connection_stream(connection_id: str, streams: List):
connection_id_url = f"{AIRBYTE_CONNECTION_URL}/{connection_id}"

# TODO: hardcoded to stripe stream right now
payload = {
"configurations": {
"streams": [{"name": streamName, "syncMode": "full_refresh_overwrite"} for streamName in streams]
},
"schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"},
"namespaceFormat": None,
"prefix": "stripe_",
}

send_request(connection_id_url, method="PATCH", payload=payload)
Expand Down
Loading
Loading