From fd61058f9181f6a84d00775a5ecac25049a3dd47 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Mon, 6 Nov 2023 10:48:16 -0500 Subject: [PATCH] chore(data-warehouse): add bucket name env var (#18391) * add env var * add info to error logs * add test workspace name * bucket name * updates * Update UI snapshots for `chromium` (1) * migrations --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- frontend/src/lib/api.ts | 10 ++++ .../settings/DataWarehouseSettingsScene.tsx | 50 ++++++++++++++++++- .../settings/dataWarehouseSettingsLogic.ts | 38 +++++++++++++- latest_migrations.manifest | 2 +- .../0360_externaldatasource_destination_id.py | 17 +++++++ posthog/settings/airbyte.py | 1 + posthog/warehouse/api/external_data_source.py | 33 ++++++++++++ .../warehouse/external_data_source/client.py | 1 + .../external_data_source/connection.py | 3 +- .../external_data_source/destination.py | 2 +- .../external_data_source/workspace.py | 8 ++- .../warehouse/models/external_data_source.py | 1 + posthog/warehouse/sync_resource.py | 14 ++++-- 13 files changed, 169 insertions(+), 11 deletions(-) create mode 100644 posthog/migrations/0360_externaldatasource_destination_id.py diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 2c2a1abda8ed8..63e4d08fd2d8f 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -573,6 +573,10 @@ class ApiRequest { return this.projectsDetail(teamId).addPathComponent('external_data_sources') } + public externalDataSource(sourceId: ExternalDataStripeSource['id'], teamId?: TeamType['id']): ApiRequest { + return this.externalDataSources(teamId).addPathComponent(sourceId) + } + // Request finalization public async get(options?: ApiMethodOptions): Promise { return await api.get(this.assembleFullUrl(), options) @@ -1587,6 +1591,12 @@ const api = { ): Promise { return await new ApiRequest().externalDataSources().create({ data }) }, + async delete(sourceId: ExternalDataStripeSource['id']): Promise { + await new ApiRequest().externalDataSource(sourceId).delete() + }, + async reload(sourceId: ExternalDataStripeSource['id']): Promise { + await new ApiRequest().externalDataSource(sourceId).withAction('reload').create() + }, }, dataWarehouseViewLinks: { diff --git a/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx index 94f3204e41fe7..93d475512c20d 100644 --- a/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx +++ b/frontend/src/scenes/data-warehouse/settings/DataWarehouseSettingsScene.tsx @@ -7,6 +7,8 @@ import { dataWarehouseSceneLogic } from '../external/dataWarehouseSceneLogic' import SourceModal from '../external/SourceModal' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' import { FEATURE_FLAGS } from 'lib/constants' +import { More } from 'lib/lemon-ui/LemonButton/More' +import { LoadingOutlined } from '@ant-design/icons' export const scene: SceneExport = { component: DataWarehouseSettingsScene, @@ -20,7 +22,9 @@ const StatusTagSetting = { } export function DataWarehouseSettingsScene(): JSX.Element { - const { dataWarehouseSources, dataWarehouseSourcesLoading } = useValues(dataWarehouseSettingsLogic) + const { dataWarehouseSources, dataWarehouseSourcesLoading, sourceReloadingById } = + useValues(dataWarehouseSettingsLogic) + const { deleteSource, reloadSource } = useActions(dataWarehouseSettingsLogic) const { toggleSourceModal } = useActions(dataWarehouseSceneLogic) const { isSourceModalOpen } = useValues(dataWarehouseSceneLogic) const { featureFlags } = useValues(featureFlagLogic) @@ -77,6 +81,50 @@ export function DataWarehouseSettingsScene(): JSX.Element { ) }, }, + { + key: 'actions', + width: 0, + render: function RenderActions(_, source) { + return ( +
+ {sourceReloadingById[source.id] ? ( +
+ +
+ ) : ( +
+ + { + reloadSource(source) + }} + > + Reload + + { + deleteSource(source) + }} + > + Delete + + + } + /> +
+ )} +
+ ) + }, + }, ]} /> diff --git a/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts index 5be2cb17dfbf5..a7c40b36401b3 100644 --- a/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts +++ b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts @@ -1,4 +1,4 @@ -import { afterMount, kea, path, selectors } from 'kea' +import { actions, afterMount, kea, listeners, path, reducers, selectors } from 'kea' import type { dataWarehouseSettingsLogicType } from './dataWarehouseSettingsLogicType' import { loaders } from 'kea-loaders' @@ -10,6 +10,11 @@ export interface DataWarehouseSource {} export const dataWarehouseSettingsLogic = kea([ path(['scenes', 'data-warehouse', 'settings', 'dataWarehouseSettingsLogic']), + actions({ + deleteSource: (source: ExternalDataStripeSource) => ({ source }), + reloadSource: (source: ExternalDataStripeSource) => ({ source }), + loadingFinished: (source: ExternalDataStripeSource) => ({ source }), + }), loaders({ dataWarehouseSources: [ null as PaginatedResponse | null, @@ -20,6 +25,25 @@ export const dataWarehouseSettingsLogic = kea([ }, ], }), + reducers({ + sourceReloadingById: [ + {} as Record, + { + reloadSource: (state, { source }) => ({ + ...state, + [source.id]: true, + }), + deleteSource: (state, { source }) => ({ + ...state, + [source.id]: true, + }), + loadingFinished: (state, { source }) => ({ + ...state, + [source.id]: false, + }), + }, + ], + }), selectors({ breadcrumbs: [ () => [], @@ -35,6 +59,18 @@ export const dataWarehouseSettingsLogic = kea([ ], ], }), + listeners(({ actions }) => ({ + deleteSource: async ({ source }) => { + await api.externalDataSources.delete(source.id) + actions.loadSources() + actions.loadingFinished(source) + }, + reloadSource: async ({ source }) => { + await api.externalDataSources.reload(source.id) + actions.loadSources() + actions.loadingFinished(source) + }, + })), afterMount(({ actions }) => { actions.loadSources() }), diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 5d996c9f43b48..814bf32751eb3 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0359_team_external_data_workspace_id +posthog: 0360_externaldatasource_destination_id sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0360_externaldatasource_destination_id.py b/posthog/migrations/0360_externaldatasource_destination_id.py new file mode 100644 index 0000000000000..de83cb78f3b31 --- /dev/null +++ b/posthog/migrations/0360_externaldatasource_destination_id.py @@ -0,0 +1,17 @@ +# Generated by Django 3.2.19 on 2023-11-03 18:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0359_team_external_data_workspace_id"), + ] + + operations = [ + migrations.AddField( + model_name="externaldatasource", + name="destination_id", + field=models.CharField(blank=True, max_length=400, null=True), + ), + ] diff --git a/posthog/settings/airbyte.py b/posthog/settings/airbyte.py index 00a73e720144a..5be12b697d1b8 100644 --- a/posthog/settings/airbyte.py +++ b/posthog/settings/airbyte.py @@ -5,3 +5,4 @@ AIRBYTE_BUCKET_KEY = os.getenv("AIRBYTE_BUCKET_KEY", None) AIRBYTE_BUCKET_SECRET = os.getenv("AIRBYTE_BUCKET_SECRET", None) AIRBYTE_BUCKET_DOMAIN = os.getenv("AIRBYTE_BUCKET_DOMAIN", None) +AIRBYTE_BUCKET_NAME = os.getenv("AIRBYTE_BUCKET_NAME", None) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 72a839741f125..3bfcc64e497d1 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -10,10 +10,15 @@ from posthog.warehouse.external_data_source.source import StripeSourcePayload, create_stripe_source, delete_source from posthog.warehouse.external_data_source.connection import create_connection, start_sync from posthog.warehouse.external_data_source.destination import create_destination, delete_destination +from posthog.warehouse.sync_resource import sync_resource from posthog.api.routing import StructuredViewSetMixin +from rest_framework.decorators import action from posthog.models import User from typing import Any +import structlog + +logger = structlog.get_logger(__name__) class ExternalDataSourceSerializers(serializers.ModelSerializer): @@ -75,6 +80,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: ExternalDataSource.objects.create( source_id=new_source.source_id, connection_id=new_connection.connection_id, + destination_id=new_destination.destination_id, team=self.team, status="running", source_type="Stripe", @@ -83,3 +89,30 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: start_sync(new_connection.connection_id) return Response(status=status.HTTP_201_CREATED, data={"source_id": new_source.source_id}) + + def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response: + instance = self.get_object() + + try: + delete_source(instance.source_id) + except Exception as e: + logger.exception( + f"Data Warehouse: Failed to delete source with id: {instance.source_id}", + exc_info=e, + ) + + try: + delete_destination(instance.destination_id) + except Exception as e: + logger.exception( + f"Data Warehouse: Failed to delete destination with id: {instance.destination_id}", + exc_info=e, + ) + + return super().destroy(request, *args, **kwargs) + + @action(methods=["POST"], detail=True) + def reload(self, request: Request, *args: Any, **kwargs: Any): + instance = self.get_object() + sync_resource(instance.id) + return Response(status=status.HTTP_200_OK) diff --git a/posthog/warehouse/external_data_source/client.py b/posthog/warehouse/external_data_source/client.py index ef37c92a7c34a..a4c3d1da66c98 100644 --- a/posthog/warehouse/external_data_source/client.py +++ b/posthog/warehouse/external_data_source/client.py @@ -17,6 +17,7 @@ def send_request(path, method, params=None, payload=None): response = requests.patch(path, json=payload, headers=headers) elif method == "DELETE": response = requests.delete(path, headers=headers) + return else: raise ValueError(f"Invalid method: {method}") diff --git a/posthog/warehouse/external_data_source/connection.py b/posthog/warehouse/external_data_source/connection.py index 38619dc0900fb..fc89f22abb65b 100644 --- a/posthog/warehouse/external_data_source/connection.py +++ b/posthog/warehouse/external_data_source/connection.py @@ -62,7 +62,8 @@ def start_sync(connection_id: str): send_request(AIRBYTE_JOBS_URL, method="POST", payload=payload) except Exception as e: logger.exception( - f"Sync Resource failed with an unexpected exception for connection id: {connection_id}", exc_info=e + f"Data Warehouse: Sync Resource failed with an unexpected exception for connection id: {connection_id}", + exc_info=e, ) diff --git a/posthog/warehouse/external_data_source/destination.py b/posthog/warehouse/external_data_source/destination.py index eb45744df8340..99966a7153a98 100644 --- a/posthog/warehouse/external_data_source/destination.py +++ b/posthog/warehouse/external_data_source/destination.py @@ -17,7 +17,7 @@ def create_destination(team_id: int, workspace_id: str) -> ExternalDataDestinati "s3_bucket_region": settings.AIRBYTE_BUCKET_REGION, "access_key_id": settings.AIRBYTE_BUCKET_KEY, "secret_access_key": settings.AIRBYTE_BUCKET_SECRET, - "s3_bucket_name": "databeach-hackathon", + "s3_bucket_name": settings.AIRBYTE_BUCKET_NAME, "s3_bucket_path": f"airbyte/{team_id}", }, "name": f"S3/{team_id}", diff --git a/posthog/warehouse/external_data_source/workspace.py b/posthog/warehouse/external_data_source/workspace.py index 086f7c46def96..e92c07fc888cd 100644 --- a/posthog/warehouse/external_data_source/workspace.py +++ b/posthog/warehouse/external_data_source/workspace.py @@ -1,11 +1,17 @@ from posthog.models import Team from posthog.warehouse.external_data_source.client import send_request +from django.conf import settings AIRBYTE_WORKSPACE_URL = "https://api.airbyte.com/v1/workspaces" def create_workspace(team_id: int): - payload = {"name": "Team " + str(team_id)} + if settings.DEBUG or settings.TEST: + workspace_name = "Team " + str(team_id) + " (TEST)" + else: + workspace_name = "Team " + str(team_id) + + payload = {"name": workspace_name} response = send_request(AIRBYTE_WORKSPACE_URL, method="POST", payload=payload) return response["workspaceId"] diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 60171a7035cc2..ed2f4f77d49cc 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -9,6 +9,7 @@ class Type(models.TextChoices): source_id: models.CharField = models.CharField(max_length=400) connection_id: models.CharField = models.CharField(max_length=400) + destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) status: models.CharField = models.CharField(max_length=400) source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices) diff --git a/posthog/warehouse/sync_resource.py b/posthog/warehouse/sync_resource.py index 886bd00feadef..3072bf43986d9 100644 --- a/posthog/warehouse/sync_resource.py +++ b/posthog/warehouse/sync_resource.py @@ -13,25 +13,26 @@ def sync_resources(): resources = ExternalDataSource.objects.filter(are_tables_created=False, status__in=["running", "error"]) for resource in resources: - _sync_resource.delay(resource.pk) + sync_resource.delay(resource.pk) @app.task(ignore_result=True) -def _sync_resource(resource_id): +def sync_resource(resource_id): resource = ExternalDataSource.objects.get(pk=resource_id) try: job = retrieve_sync(resource.connection_id) except Exception as e: - logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e) + logger.exception("Data Warehouse: Sync Resource failed with an unexpected exception.", exc_info=e) resource.status = "error" resource.save() return if job is None: - logger.error(f"No jobs found for connection: {resource.connection_id}") + logger.error(f"Data Warehouse: No jobs found for connection: {resource.connection_id}") resource.status = "error" resource.save() + return if job["status"] == "succeeded": resource = ExternalDataSource.objects.get(pk=resource_id) @@ -53,7 +54,10 @@ def _sync_resource(resource_id): try: table.columns = table.get_columns() except Exception as e: - logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e) + logger.exception( + f"Data Warehouse: Sync Resource failed with an unexpected exception for connection: {resource.connection_id}", + exc_info=e, + ) else: table.save()