From 32721ef1fded3f3103bed78c878f835131171559 Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 9 Oct 2023 12:25:47 -0400 Subject: [PATCH] rename and add connection --- latest_migrations.manifest | 2 +- posthog/api/__init__.py | 4 +- ...rbytesource.py => 0351_airbyteresource.py} | 3 +- posthog/settings/airbyte.py | 1 + posthog/warehouse/airbyte/connection.py | 42 +++++++++++++++++++ .../{airbyte.py => airbyte/source.py} | 2 +- ...{airbyte_source.py => airbyte_resource.py} | 24 +++++++---- posthog/warehouse/api/test/test_airbyte.py | 17 ++++---- posthog/warehouse/models/__init__.py | 2 +- ...{airbyte_source.py => airbyte_resource.py} | 3 +- 10 files changed, 75 insertions(+), 25 deletions(-) rename posthog/migrations/{0351_airbytesource.py => 0351_airbyteresource.py} (92%) create mode 100644 posthog/warehouse/airbyte/connection.py rename posthog/warehouse/{airbyte.py => airbyte/source.py} (98%) rename posthog/warehouse/api/{airbyte_source.py => airbyte_resource.py} (73%) rename posthog/warehouse/models/{airbyte_source.py => airbyte_resource.py} (72%) diff --git a/latest_migrations.manifest b/latest_migrations.manifest index cd3ddf44076b7..3b3955466ff2f 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0351_airbytesource +posthog: 0351_airbyteresource sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py index 912de6ac23737..cebe00419033e 100644 --- a/posthog/api/__init__.py +++ b/posthog/api/__init__.py @@ -3,7 +3,7 @@ from posthog.api.routing import DefaultRouterPlusPlus from posthog.batch_exports import http as batch_exports from posthog.settings import EE_AVAILABLE -from posthog.warehouse.api import airbyte_source, saved_query, table, view_link +from posthog.warehouse.api import airbyte_resource, saved_query, table, view_link from . import ( activity_log, annotation, @@ -194,7 +194,7 @@ def api_not_found(request): # Airbyte projects_router.register( - r"airbyte_sources", airbyte_source.AirbyteSourceViewSet, "project_airbyte_sources", ["team_id"] + r"airbyte_resources", airbyte_resource.AirbyteSourceViewSet, "project_airbyte_resources", ["team_id"] ) # General endpoints (shared across CH & PG) diff --git a/posthog/migrations/0351_airbytesource.py b/posthog/migrations/0351_airbyteresource.py similarity index 92% rename from posthog/migrations/0351_airbytesource.py rename to posthog/migrations/0351_airbyteresource.py index a19308d70a887..2e7abe85f6d33 100644 --- a/posthog/migrations/0351_airbytesource.py +++ b/posthog/migrations/0351_airbyteresource.py @@ -14,7 +14,7 @@ class Migration(migrations.Migration): operations = [ migrations.CreateModel( - name="AirbyteSource", + name="AirbyteResource", fields=[ ("created_at", models.DateTimeField(auto_now_add=True)), ( @@ -24,6 +24,7 @@ class Migration(migrations.Migration): ), ), ("source_id", models.CharField(max_length=400)), + ("connection_id", models.CharField(max_length=400)), ( "created_by", models.ForeignKey( diff --git a/posthog/settings/airbyte.py b/posthog/settings/airbyte.py index 2987aa3b4c71f..6016b25f3e6d9 100644 --- a/posthog/settings/airbyte.py +++ b/posthog/settings/airbyte.py @@ -3,3 +3,4 @@ AIRBYTE_API_KEY = os.getenv("AIRBYTE_API_KEY", None) AIRBYTE_WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID", None) AIRBYTE_INSTANCE_NAME = os.getenv("AIRBYTE_INSTANCE_NAME", None) +AIRBYTE_DESTINATION_ID = os.getenv("AIRBYTE_DESTINATION_ID", None) diff --git a/posthog/warehouse/airbyte/connection.py b/posthog/warehouse/airbyte/connection.py new file mode 100644 index 0000000000000..3a3122eac9297 --- /dev/null +++ b/posthog/warehouse/airbyte/connection.py @@ -0,0 +1,42 @@ +import requests +from django.conf import settings +from pydantic import BaseModel + +AIRBYTE_CONNECTION_URL = "https://api.airbyte.com/v1/connections" + + +class AirbyteConnection(BaseModel): + connection_id: str + source_id: str + destination_id: str + name: str + workspace_id: str + + +def create_connection(source_id: str) -> AirbyteConnection: + token = settings.AIRBYTE_API_KEY + if not token: + raise ValueError("AIRBYTE_API_KEY must be set in order to create a source.") + + headers = {"accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {token}"} + + payload = { + "schedule": {"scheduleType": "cron", "cronExpression": "0 0 0 * * ?"}, + "namespaceFormat": None, + "sourceId": source_id, + "destinationId": settings.AIRBYTE_DESTINATION_ID, + } + + response = requests.post(AIRBYTE_CONNECTION_URL, json=payload, headers=headers) + response_payload = response.json() + + if not response.ok: + raise ValueError(response_payload["detail"]) + + return AirbyteConnection( + source_id=response_payload["sourceId"], + name=response_payload["name"], + connection_id=response_payload["connectionId"], + workspace_id=response_payload["workspaceId"], + destination_id=response_payload["destinationId"], + ) diff --git a/posthog/warehouse/airbyte.py b/posthog/warehouse/airbyte/source.py similarity index 98% rename from posthog/warehouse/airbyte.py rename to posthog/warehouse/airbyte/source.py index f33eab187bb5f..58175e834094f 100644 --- a/posthog/warehouse/airbyte.py +++ b/posthog/warehouse/airbyte/source.py @@ -87,7 +87,7 @@ def _create_source(payload: Dict) -> AirbyteSource: response = requests.post(AIRBYTE_SOURCE_URL, json=payload, headers=headers) response_payload = response.json() if not response.ok: - raise ValueError(response_payload["message"]) + raise ValueError(response_payload["detail"]) return AirbyteSource( source_id=response_payload["sourceId"], diff --git a/posthog/warehouse/api/airbyte_source.py b/posthog/warehouse/api/airbyte_resource.py similarity index 73% rename from posthog/warehouse/api/airbyte_source.py rename to posthog/warehouse/api/airbyte_resource.py index b1fa5ec2186d4..8a51dfb3ede1c 100644 --- a/posthog/warehouse/api/airbyte_source.py +++ b/posthog/warehouse/api/airbyte_resource.py @@ -5,20 +5,21 @@ from rest_framework.exceptions import NotAuthenticated from rest_framework.permissions import IsAuthenticated from rest_framework import filters, serializers, viewsets -from posthog.warehouse.models import AirbyteSource -from posthog.warehouse.airbyte import StripeSourcePayload, create_stripe_source +from posthog.warehouse.models import AirbyteResource +from posthog.warehouse.airbyte.source import StripeSourcePayload, create_stripe_source +from posthog.warehouse.airbyte.connection import create_connection from posthog.api.routing import StructuredViewSetMixin from posthog.models import User from typing import Any -class AirbyteSourceSerializers(serializers.ModelSerializer): +class AirbyteResourceSerializers(serializers.ModelSerializer): account_id = serializers.CharField(write_only=True) client_secret = serializers.CharField(write_only=True) class Meta: - model = AirbyteSource + model = AirbyteResource fields = ["id", "source_id", "created_at", "created_by"] read_only_fields = [ "id", @@ -33,8 +34,8 @@ class AirbyteSourceViewSet(StructuredViewSetMixin, viewsets.ModelViewSet): Create, Read, Update and Delete Airbyte Sources. """ - queryset = AirbyteSource.objects.all() - serializer_class = AirbyteSourceSerializers + queryset = AirbyteResource.objects.all() + serializer_class = AirbyteResourceSerializers permission_classes = [IsAuthenticated, OrganizationMemberPermissions] filter_backends = [filters.SearchFilter] search_fields = ["source_id"] @@ -62,8 +63,13 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: account_id=account_id, client_secret=client_secret, ) - stripe_response = create_stripe_source(stripe_payload) + new_source = create_stripe_source(stripe_payload) + new_connection = create_connection(new_source.source_id) - AirbyteSource.objects.create(source_id=stripe_response.source_id, team=self.request.user.current_team) + AirbyteResource.objects.create( + source_id=new_source.source_id, + connection_id=new_connection.connection_id, + team=self.request.user.current_team, + ) - return Response(status=status.HTTP_201_CREATED, data={"source_id": stripe_response.source_id}) + return Response(status=status.HTTP_201_CREATED, data={"source_id": new_source.source_id}) diff --git a/posthog/warehouse/api/test/test_airbyte.py b/posthog/warehouse/api/test/test_airbyte.py index 8b57b905df110..faa5210b50e7d 100644 --- a/posthog/warehouse/api/test/test_airbyte.py +++ b/posthog/warehouse/api/test/test_airbyte.py @@ -4,12 +4,11 @@ class TestAirbyteSource(APIBaseTest): - pass - # def test_create(self): - # response = self.client.post( - # f"/api/projects/{self.team.id}/airbyte_sources/", - # {"account_id": "123", "client_secret": "123"}, - # ) - # self.assertEqual(response.status_code, 201, response.content) - # airbyte_source = response.json() - # self.assertIsNotNone(airbyte_source["source_id"]) + def test_create(self): + response = self.client.post( + f"/api/projects/{self.team.id}/airbyte_resources/", + {"account_id": "123", "client_secret": "123"}, + ) + self.assertEqual(response.status_code, 201, response.content) + airbyte_source = response.json() + self.assertIsNotNone(airbyte_source["source_id"]) diff --git a/posthog/warehouse/models/__init__.py b/posthog/warehouse/models/__init__.py index b90d9e4893877..1722c66a6c023 100644 --- a/posthog/warehouse/models/__init__.py +++ b/posthog/warehouse/models/__init__.py @@ -2,4 +2,4 @@ from .credential import * from .datawarehouse_saved_query import * from .view_link import * -from .airbyte_source import * +from .airbyte_resource import * diff --git a/posthog/warehouse/models/airbyte_source.py b/posthog/warehouse/models/airbyte_resource.py similarity index 72% rename from posthog/warehouse/models/airbyte_source.py rename to posthog/warehouse/models/airbyte_resource.py index 0b05df46f9bcf..bcb5adc52d6c4 100644 --- a/posthog/warehouse/models/airbyte_source.py +++ b/posthog/warehouse/models/airbyte_resource.py @@ -3,8 +3,9 @@ from posthog.models.team import Team -class AirbyteSource(CreatedMetaFields, UUIDModel): +class AirbyteResource(CreatedMetaFields, UUIDModel): source_id: models.CharField = models.CharField(max_length=400) + connection_id: models.CharField = models.CharField(max_length=400) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) __repr__ = sane_repr("source_id")