Skip to content

Commit

Permalink
feat(data-warehouse): Added UI to select sync type (#22906)
Browse files Browse the repository at this point in the history
* Added UI to select sync type

* Fixed tests

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Fixed migration

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Added backfill to migration

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Fixed test

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Gilbert09 and github-actions[bot] authored Jun 17, 2024
1 parent 0ac37ae commit 14f114a
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 35 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
import { LemonSwitch, LemonTable, Link } from '@posthog/lemon-ui'
import { LemonSelect, LemonSelectOptionLeaf, LemonSwitch, LemonTable, Link } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { useState } from 'react'

import { ExternalDataSourceSyncSchema } from '~/types'

import { sourceWizardLogic } from '../../new/sourceWizardLogic'

const syncTypesToOptions = (
schema: ExternalDataSourceSyncSchema
): LemonSelectOptionLeaf<ExternalDataSourceSyncSchema['sync_type']>[] => {
const options: LemonSelectOptionLeaf<ExternalDataSourceSyncSchema['sync_type']>[] = []

if (schema.sync_types.full_refresh) {
options.push({ value: 'full_refresh', label: 'Full refresh' })
}

if (schema.sync_types.incremental) {
options.push({ value: 'incremental', label: 'Incremental' })
}

return options
}

export default function PostgresSchemaForm(): JSX.Element {
const { toggleSchemaShouldSync } = useActions(sourceWizardLogic)
const { toggleSchemaShouldSync, updateSchemaSyncType } = useActions(sourceWizardLogic)
const { databaseSchema } = useValues(sourceWizardLogic)
const [toggleAllState, setToggleAllState] = useState(false)

Expand Down Expand Up @@ -55,6 +73,23 @@ export default function PostgresSchemaForm(): JSX.Element {
)
},
},
{
key: 'sync_type',
title: 'Sync type',
tooltip:
'Full refresh will refresh the full table on every sync, whereas incremental will only sync new and updated rows since the last sync',
render: (_, schema) => {
const options = syncTypesToOptions(schema)

return (
<LemonSelect
options={options}
value={schema.sync_type}
onChange={(newValue) => updateSchemaSyncType(schema, newValue)}
/>
)
},
},
]}
/>
</div>
Expand Down
22 changes: 19 additions & 3 deletions frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
onSubmit: true,
setDatabaseSchemas: (schemas: ExternalDataSourceSyncSchema[]) => ({ schemas }),
toggleSchemaShouldSync: (schema: ExternalDataSourceSyncSchema, shouldSync: boolean) => ({ schema, shouldSync }),
updateSchemaSyncType: (
schema: ExternalDataSourceSyncSchema,
sync_type: ExternalDataSourceSyncSchema['sync_type']
) => ({
schema,
sync_type,
}),
clearSource: true,
updateSource: (source: Partial<ExternalDataSourceCreatePayload>) => ({ source }),
createSource: true,
Expand Down Expand Up @@ -408,6 +415,13 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
}))
return newSchema
},
updateSchemaSyncType: (state, { schema, sync_type }) => {
const newSchema = state.map((s) => ({
...s,
sync_type: s.table === schema.table ? sync_type : s.sync_type,
}))
return newSchema
},
},
],
source: [
Expand Down Expand Up @@ -613,9 +627,11 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
if (values.currentStep === 3 && values.selectedConnector?.name) {
actions.updateSource({
payload: {
schemas: values.databaseSchema
.filter((schema) => schema.should_sync)
.map((schema) => schema.table),
schemas: values.databaseSchema.map((schema) => ({
name: schema.table,
should_sync: schema.should_sync,
sync_type: schema.sync_type,
})),
},
})
actions.setIsLoading(true)
Expand Down
5 changes: 5 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3835,6 +3835,11 @@ export interface SimpleExternalDataSourceSchema {
export interface ExternalDataSourceSyncSchema {
table: string
should_sync: boolean
sync_type: 'full_refresh' | 'incremental'
sync_types: {
full_refresh: boolean
incremental: boolean
}
}

export interface ExternalDataSourceSchema extends SimpleExternalDataSourceSchema {
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0427_hogfunction_icon_url_hogfunction_template_id
posthog: 0428_externaldataschema_sync_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
31 changes: 31 additions & 0 deletions posthog/migrations/0428_externaldataschema_sync_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Generated by Django 4.2.11 on 2024-06-11 17:59

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0427_hogfunction_icon_url_hogfunction_template_id"),
]

operations = [
migrations.AddField(
model_name="externaldataschema",
name="sync_type",
field=models.CharField(
blank=True,
choices=[("full_refresh", "full_refresh"), ("incremental", "incremental")],
default="full_refresh",
max_length=128,
),
),
migrations.RunSQL(
sql="""
UPDATE posthog_externaldataschema AS schema
SET sync_type = 'incremental'
FROM posthog_externaldatasource AS source
WHERE schema.source_id = source.id AND source.source_type = 'Stripe' AND schema.name = 'Invoice'
""",
reverse_sql=migrations.RunSQL.noop,
),
]
2 changes: 1 addition & 1 deletion posthog/temporal/data_imports/pipelines/stripe/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# These endpoints are converted into ExternalDataSchema objects when a source is linked.
ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge")

INCREMENTAL_ENDPOINTS = "Invoice"
INCREMENTAL_ENDPOINTS = ("Invoice",)
16 changes: 15 additions & 1 deletion posthog/warehouse/api/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,29 @@
class ExternalDataSchemaSerializer(serializers.ModelSerializer):
table = serializers.SerializerMethodField(read_only=True)
incremental = serializers.SerializerMethodField(read_only=True)
sync_type = serializers.SerializerMethodField(read_only=True)

class Meta:
model = ExternalDataSchema

fields = ["id", "name", "table", "should_sync", "last_synced_at", "latest_error", "incremental", "status"]
fields = [
"id",
"name",
"table",
"should_sync",
"last_synced_at",
"latest_error",
"incremental",
"status",
"sync_type",
]

def get_incremental(self, schema: ExternalDataSchema) -> bool:
return schema.is_incremental

def get_sync_type(self, schema: ExternalDataSchema) -> ExternalDataSchema.SyncType:
return schema.sync_type or ExternalDataSchema.SyncType.FULL_REFRESH

def get_table(self, schema: ExternalDataSchema) -> Optional[dict]:
from posthog.warehouse.api.table import SimpleTableSerializer

Expand Down
69 changes: 54 additions & 15 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer
from posthog.hogql.database.database import create_hogql_database
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING,
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)
from posthog.temporal.data_imports.pipelines.hubspot.auth import (
Expand Down Expand Up @@ -206,30 +207,42 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
raise NotImplementedError(f"Source type {source_type} not implemented")

payload = request.data["payload"]
enabled_schemas = payload.get("schemas", None)
schemas = payload.get("schemas", None)
if source_type == ExternalDataSource.Type.POSTGRES:
default_schemas = postgres_schemas
elif source_type == ExternalDataSource.Type.SNOWFLAKE:
default_schemas = snowflake_schemas
else:
default_schemas = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type])

# Fallback to defaults if schemas is missing
if enabled_schemas is None:
enabled_schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source_type]
if not schemas or not isinstance(schemas, list):
new_source_model.delete()
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": "Schemas not given"},
)

disabled_schemas = [schema for schema in default_schemas if schema not in enabled_schemas]
# Return 400 if we get any schema names that don't exist in our source
if any(schema.get("name") not in default_schemas for schema in schemas):
new_source_model.delete()
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": "Schemas given do not exist in source"},
)

active_schemas: list[ExternalDataSchema] = []

for schema in enabled_schemas:
active_schemas.append(
ExternalDataSchema.objects.create(
name=schema, team=self.team, source=new_source_model, should_sync=True
)
for schema in schemas:
schema_model = ExternalDataSchema.objects.create(
name=schema.get("name"),
team=self.team,
source=new_source_model,
should_sync=schema.get("should_sync"),
sync_type=schema.get("sync_type"),
)
for schema in disabled_schemas:
ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=False)

if schema.get("should_sync"):
active_schemas.append(schema_model)

try:
for active_schema in active_schemas:
Expand Down Expand Up @@ -594,7 +607,15 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any):
data={"message": GenericPostgresError},
)

result_mapped_to_options = [{"table": row, "should_sync": True} for row in result]
result_mapped_to_options = [
{
"table": row,
"should_sync": True,
"sync_types": {"full_refresh": True, "incremental": False},
"sync_type": "full_refresh",
}
for row in result
]
return Response(status=status.HTTP_200_OK, data=result_mapped_to_options)
elif source_type == ExternalDataSource.Type.SNOWFLAKE:
account_id = request.data.get("account_id")
Expand Down Expand Up @@ -638,18 +659,36 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any):
status=status.HTTP_400_BAD_REQUEST,
data={"message": GenericSnowflakeError},
)
result_mapped_to_options = [{"table": row, "should_sync": True} for row in result]
result_mapped_to_options = [
{
"table": row,
"should_sync": True,
"sync_types": {"full_refresh": True, "incremental": False},
"sync_type": "full_refresh",
}
for row in result
]
return Response(status=status.HTTP_200_OK, data=result_mapped_to_options)

# Return the possible endpoints for all other source types
schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source_type, None)
incremental_schemas = PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING.get(source_type, ())

if schemas is None:
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": "Invalid parameter: source_type"},
)

options = [{"table": row, "should_sync": True} for row in schemas]
options = [
{
"table": row,
"should_sync": True,
"sync_types": {"full_refresh": True, "incremental": row in incremental_schemas},
"sync_type": "incremental" if row in incremental_schemas else "full_refresh",
}
for row in schemas
]
return Response(status=status.HTTP_200_OK, data=options)

@action(methods=["POST"], detail=False)
Expand Down
Loading

0 comments on commit 14f114a

Please sign in to comment.