Skip to content

Commit

Permalink
feat(data-warehouse): edit sync frequency (#22757)
Browse files Browse the repository at this point in the history
* remove postgres ff

* backend

* frontend

* update migration

* Update UI snapshots for `chromium` (2)

* comments

* update migration

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* fix tests

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* format

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Restore main

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Jun 11, 2024
1 parent 09061db commit 5937992
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 25 deletions.
6 changes: 6 additions & 0 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,12 @@ const api = {
async reload(sourceId: ExternalDataStripeSource['id']): Promise<void> {
await new ApiRequest().externalDataSource(sourceId).withAction('reload').create()
},
async update(
sourceId: ExternalDataStripeSource['id'],
data: Partial<ExternalDataStripeSource>
): Promise<ExternalDataStripeSource> {
return await new ApiRequest().externalDataSource(sourceId).update({ data })
},
async database_schema(
source_type: ExternalDataSourceType,
payload: Record<string, any>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import { TZLabel } from '@posthog/apps-common'
import { LemonButton, LemonDialog, LemonSwitch, LemonTable, LemonTag, Link, Spinner, Tooltip } from '@posthog/lemon-ui'
import {
LemonButton,
LemonDialog,
LemonSelect,
LemonSwitch,
LemonTable,
LemonTag,
Link,
Spinner,
Tooltip,
} from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { router } from 'kea-router'
import { ProductIntroduction } from 'lib/components/ProductIntroduction/ProductIntroduction'
Expand All @@ -14,10 +24,10 @@ import { urls } from 'scenes/urls'

import { DataTableNode, NodeKind } from '~/queries/schema'
import {
DataWarehouseSyncInterval,
ExternalDataSourceSchema,
ExternalDataSourceType,
ExternalDataStripeSource,
PipelineInterval,
ProductKey,
} from '~/types'

Expand All @@ -33,7 +43,7 @@ const StatusTagSetting = {
export function DataWarehouseSourcesTable(): JSX.Element {
const { dataWarehouseSources, dataWarehouseSourcesLoading, sourceReloadingById } =
useValues(dataWarehouseSettingsLogic)
const { deleteSource, reloadSource } = useActions(dataWarehouseSettingsLogic)
const { deleteSource, reloadSource, updateSource } = useActions(dataWarehouseSettingsLogic)

const renderExpandable = (source: ExternalDataStripeSource): JSX.Element => {
return (
Expand Down Expand Up @@ -90,8 +100,20 @@ export function DataWarehouseSourcesTable(): JSX.Element {
{
title: 'Sync Frequency',
key: 'frequency',
render: function RenderFrequency() {
return 'day' as PipelineInterval
render: function RenderFrequency(_, source) {
return (
<LemonSelect
value={source.sync_frequency || 'day'}
onChange={(value) =>
updateSource({ ...source, sync_frequency: value as DataWarehouseSyncInterval })
}
options={[
{ value: 'day' as DataWarehouseSyncInterval, label: 'Daily' },
{ value: 'week' as DataWarehouseSyncInterval, label: 'Weekly' },
{ value: 'month' as DataWarehouseSyncInterval, label: 'Monthly' },
]}
/>
)
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
updateSchema: (schema: ExternalDataSourceSchema) => ({ schema }),
abortAnyRunningQuery: true,
}),
loaders(({ cache, actions }) => ({
loaders(({ cache, actions, values }) => ({
dataWarehouseSources: [
null as PaginatedResponse<ExternalDataStripeSource> | null,
{
Expand All @@ -45,6 +45,15 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([

return res
},
updateSource: async (source: ExternalDataStripeSource) => {
const updatedSource = await api.externalDataSources.update(source.id, source)
return {
...values.dataWarehouseSources,
results:
values.dataWarehouseSources?.results.map((s) => (s.id === updatedSource.id ? source : s)) ||
[],
}
},
},
],
})),
Expand Down
3 changes: 3 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3763,6 +3763,7 @@ export interface ExternalDataStripeSource {
prefix: string
last_run_at?: Dayjs
schemas: ExternalDataSourceSchema[]
sync_frequency: DataWarehouseSyncInterval
}
export interface SimpleExternalDataSourceSchema {
id: string
Expand Down Expand Up @@ -3896,6 +3897,8 @@ export type BatchExportService =

export type PipelineInterval = 'hour' | 'day' | 'every 5 minutes'

export type DataWarehouseSyncInterval = 'day' | 'week' | 'month'

export type BatchExportConfiguration = {
// User provided data for the export. This is the data that the user
// provides when creating the export.
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0425_hogfunction
posthog: 0426_externaldatasource_sync_frequency
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
22 changes: 22 additions & 0 deletions posthog/migrations/0426_externaldatasource_sync_frequency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.2.11 on 2024-06-06 15:43

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0425_hogfunction"),
]

operations = [
migrations.AddField(
model_name="externaldatasource",
name="sync_frequency",
field=models.CharField(
blank=True,
choices=[("day", "Daily"), ("week", "Weekly"), ("month", "Monthly")],
default="day",
max_length=128,
),
),
]
33 changes: 19 additions & 14 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.warehouse.data_load.service import (
sync_external_data_job_workflow,
trigger_external_data_workflow,
delete_external_data_schedule,
cancel_external_data_workflow,
delete_data_import_folder,
Expand Down Expand Up @@ -78,8 +77,18 @@ class Meta:
"prefix",
"last_run_at",
"schemas",
"sync_frequency",
]
read_only_fields = [
"id",
"created_by",
"created_at",
"status",
"source_type",
"last_run_at",
"schemas",
"prefix",
]
read_only_fields = ["id", "created_by", "created_at", "status", "source_type", "last_run_at", "schemas"]

def get_last_run_at(self, instance: ExternalDataSource) -> str:
latest_completed_run = (
Expand Down Expand Up @@ -116,6 +125,12 @@ def get_schemas(self, instance: ExternalDataSource):
schemas = instance.schemas.order_by("name").all()
return ExternalDataSchemaSerializer(schemas, many=True, read_only=True, context=self.context).data

def update(self, instance: ExternalDataSource, validated_data: Any) -> Any:
updated_source: ExternalDataSource = super().update(instance, validated_data)
updated_source.update_schemas()

return updated_source


class SimpleExternalDataSourceSerializers(serializers.ModelSerializer):
class Meta:
Expand Down Expand Up @@ -448,7 +463,7 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:

@action(methods=["POST"], detail=True)
def reload(self, request: Request, *args: Any, **kwargs: Any):
instance = self.get_object()
instance: ExternalDataSource = self.get_object()

if is_any_external_data_job_paused(self.team_id):
return Response(
Expand All @@ -461,17 +476,7 @@ def reload(self, request: Request, *args: Any, **kwargs: Any):

except temporalio.service.RPCError:
# if the source schedule has been removed - trigger the schema schedules
for schema in ExternalDataSchema.objects.filter(
team_id=self.team_id, source_id=instance.id, should_sync=True
).all():
try:
trigger_external_data_workflow(schema)
except temporalio.service.RPCError as e:
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:
sync_external_data_job_workflow(schema, create=True)

except Exception as e:
logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e)
instance.reload_schemas()

except Exception as e:
logger.exception("Could not trigger external data job", exc_info=e)
Expand Down
49 changes: 48 additions & 1 deletion posthog/warehouse/api/test/test_external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)
from posthog.warehouse.data_load.service import get_sync_schedule
from django.test import override_settings
from django.conf import settings
from posthog.models import Team
import psycopg
from rest_framework import status

import datetime


class TestSavedQuery(APIBaseTest):
Expand Down Expand Up @@ -102,7 +106,17 @@ def test_get_external_data_source_with_schema(self):
self.assertEqual(response.status_code, 200)
self.assertListEqual(
list(payload.keys()),
["id", "created_at", "created_by", "status", "source_type", "prefix", "last_run_at", "schemas"],
[
"id",
"created_at",
"created_by",
"status",
"source_type",
"prefix",
"last_run_at",
"schemas",
"sync_frequency",
],
)
self.assertEqual(
payload["schemas"],
Expand Down Expand Up @@ -280,3 +294,36 @@ def test_internal_postgres(self, patch_get_postgres_schemas):
)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json(), {"message": "Cannot use internal Postgres database"})

@patch("posthog.warehouse.data_load.service.sync_external_data_job_workflow")
def test_update_source_sync_frequency(self, _patch_sync_external_data_job_workflow):
source = self._create_external_data_source()
schema = self._create_external_data_schema(source.pk)

self.assertEqual(source.sync_frequency, ExternalDataSource.SyncFrequency.DAILY)
# test schedule
schedule = get_sync_schedule(schema)
self.assertEqual(
schedule.spec.intervals[0].every,
datetime.timedelta(days=1),
)

# test api
response = self.client.patch(
f"/api/projects/{self.team.id}/external_data_sources/{source.pk}/",
data={"sync_frequency": ExternalDataSource.SyncFrequency.WEEKLY},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)

source.refresh_from_db()
schema.refresh_from_db()

self.assertEqual(source.sync_frequency, ExternalDataSource.SyncFrequency.WEEKLY)
self.assertEqual(_patch_sync_external_data_job_workflow.call_count, 1)

# test schedule
schedule = get_sync_schedule(schema)
self.assertEqual(
schedule.spec.intervals[0].every,
datetime.timedelta(days=7),
)
17 changes: 14 additions & 3 deletions posthog/warehouse/data_load/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def get_sync_schedule(external_data_schema: ExternalDataSchema):
external_data_source_id=external_data_schema.source_id,
)

sync_frequency = get_sync_frequency(external_data_schema)

return Schedule(
action=ScheduleActionStartWorkflow(
"external-data-job",
Expand All @@ -55,9 +57,7 @@ def get_sync_schedule(external_data_schema: ExternalDataSchema):
),
spec=ScheduleSpec(
intervals=[
ScheduleIntervalSpec(
every=timedelta(hours=24), offset=timedelta(hours=external_data_schema.created_at.hour)
)
ScheduleIntervalSpec(every=sync_frequency, offset=timedelta(hours=external_data_schema.created_at.hour))
],
jitter=timedelta(hours=2),
),
Expand All @@ -66,6 +66,17 @@ def get_sync_schedule(external_data_schema: ExternalDataSchema):
)


def get_sync_frequency(external_data_schema: ExternalDataSchema):
if external_data_schema.source.sync_frequency == ExternalDataSource.SyncFrequency.DAILY:
return timedelta(days=1)
elif external_data_schema.source.sync_frequency == ExternalDataSource.SyncFrequency.WEEKLY:
return timedelta(weeks=1)
elif external_data_schema.source.sync_frequency == ExternalDataSource.SyncFrequency.MONTHLY:
return timedelta(days=30)
else:
raise ValueError(f"Unknown sync frequency: {external_data_schema.source.sync_frequency}")


def sync_external_data_job_workflow(
external_data_schema: ExternalDataSchema, create: bool = False
) -> ExternalDataSchema:
Expand Down
40 changes: 40 additions & 0 deletions posthog/warehouse/models/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from posthog.warehouse.util import database_sync_to_async
from uuid import UUID

import structlog
import temporalio

logger = structlog.get_logger(__name__)


class ExternalDataSource(CreatedMetaFields, UUIDModel):
class Type(models.TextChoices):
Expand All @@ -22,11 +27,21 @@ class Status(models.TextChoices):
COMPLETED = "Completed", "Completed"
CANCELLED = "Cancelled", "Cancelled"

class SyncFrequency(models.TextChoices):
DAILY = "day", "Daily"
WEEKLY = "week", "Weekly"
MONTHLY = "month", "Monthly"
# TODO provide flexible schedule definition

source_id: models.CharField = models.CharField(max_length=400)
connection_id: models.CharField = models.CharField(max_length=400)
destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True)
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)

sync_frequency: models.CharField = models.CharField(
max_length=128, choices=SyncFrequency.choices, default=SyncFrequency.DAILY, blank=True
)

# `status` is deprecated in favour of external_data_schema.status
status: models.CharField = models.CharField(max_length=400)
source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices)
Expand All @@ -38,6 +53,31 @@ class Status(models.TextChoices):

__repr__ = sane_repr("id")

def reload_schemas(self):
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
from posthog.warehouse.data_load.service import sync_external_data_job_workflow, trigger_external_data_workflow

for schema in ExternalDataSchema.objects.filter(
team_id=self.team.pk, source_id=self.id, should_sync=True
).all():
try:
trigger_external_data_workflow(schema)
except temporalio.service.RPCError as e:
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:
sync_external_data_job_workflow(schema, create=True)

except Exception as e:
logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e)

def update_schemas(self):
from posthog.warehouse.models.external_data_schema import ExternalDataSchema
from posthog.warehouse.data_load.service import sync_external_data_job_workflow

for schema in ExternalDataSchema.objects.filter(
team_id=self.team.pk, source_id=self.id, should_sync=True
).all():
sync_external_data_job_workflow(schema, create=False)


@database_sync_to_async
def get_external_data_source(source_id: UUID) -> ExternalDataSource:
Expand Down

0 comments on commit 5937992

Please sign in to comment.