Skip to content

Commit

Permalink
chore(data-warehouse): add bucket name env var (#18391)
Browse files Browse the repository at this point in the history
* add env var

* add info to error logs

* add test workspace name

* bucket name

* updates

* Update UI snapshots for `chromium` (1)

* migrations

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Nov 6, 2023
1 parent 91cadb8 commit fd61058
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 11 deletions.
10 changes: 10 additions & 0 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ class ApiRequest {
return this.projectsDetail(teamId).addPathComponent('external_data_sources')
}

public externalDataSource(sourceId: ExternalDataStripeSource['id'], teamId?: TeamType['id']): ApiRequest {
return this.externalDataSources(teamId).addPathComponent(sourceId)
}

// Request finalization
public async get(options?: ApiMethodOptions): Promise<any> {
return await api.get(this.assembleFullUrl(), options)
Expand Down Expand Up @@ -1587,6 +1591,12 @@ const api = {
): Promise<ExternalDataStripeSourceCreatePayload> {
return await new ApiRequest().externalDataSources().create({ data })
},
async delete(sourceId: ExternalDataStripeSource['id']): Promise<void> {
await new ApiRequest().externalDataSource(sourceId).delete()
},
async reload(sourceId: ExternalDataStripeSource['id']): Promise<void> {
await new ApiRequest().externalDataSource(sourceId).withAction('reload').create()
},
},

dataWarehouseViewLinks: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { dataWarehouseSceneLogic } from '../external/dataWarehouseSceneLogic'
import SourceModal from '../external/SourceModal'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'
import { More } from 'lib/lemon-ui/LemonButton/More'
import { LoadingOutlined } from '@ant-design/icons'

export const scene: SceneExport = {
component: DataWarehouseSettingsScene,
Expand All @@ -20,7 +22,9 @@ const StatusTagSetting = {
}

export function DataWarehouseSettingsScene(): JSX.Element {
const { dataWarehouseSources, dataWarehouseSourcesLoading } = useValues(dataWarehouseSettingsLogic)
const { dataWarehouseSources, dataWarehouseSourcesLoading, sourceReloadingById } =
useValues(dataWarehouseSettingsLogic)
const { deleteSource, reloadSource } = useActions(dataWarehouseSettingsLogic)
const { toggleSourceModal } = useActions(dataWarehouseSceneLogic)
const { isSourceModalOpen } = useValues(dataWarehouseSceneLogic)
const { featureFlags } = useValues(featureFlagLogic)
Expand Down Expand Up @@ -77,6 +81,50 @@ export function DataWarehouseSettingsScene(): JSX.Element {
)
},
},
{
key: 'actions',
width: 0,
render: function RenderActions(_, source) {
return (
<div className="flex flex-row justify-end">
{sourceReloadingById[source.id] ? (
<div>
<LoadingOutlined />
</div>
) : (
<div>
<More
overlay={
<>
<LemonButton
type="tertiary"
data-attr={`reload-data-warehouse-${source.source_type}`}
key={`reload-data-warehouse-${source.source_type}`}
onClick={() => {
reloadSource(source)
}}
>
Reload
</LemonButton>
<LemonButton
status="danger"
data-attr={`delete-data-warehouse-${source.source_type}`}
key={`delete-data-warehouse-${source.source_type}`}
onClick={() => {
deleteSource(source)
}}
>
Delete
</LemonButton>
</>
}
/>
</div>
)}
</div>
)
},
},
]}
/>
<SourceModal isOpen={isSourceModalOpen} onClose={toggleSourceModal} />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { afterMount, kea, path, selectors } from 'kea'
import { actions, afterMount, kea, listeners, path, reducers, selectors } from 'kea'

import type { dataWarehouseSettingsLogicType } from './dataWarehouseSettingsLogicType'
import { loaders } from 'kea-loaders'
Expand All @@ -10,6 +10,11 @@ export interface DataWarehouseSource {}

export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
path(['scenes', 'data-warehouse', 'settings', 'dataWarehouseSettingsLogic']),
actions({
deleteSource: (source: ExternalDataStripeSource) => ({ source }),
reloadSource: (source: ExternalDataStripeSource) => ({ source }),
loadingFinished: (source: ExternalDataStripeSource) => ({ source }),
}),
loaders({
dataWarehouseSources: [
null as PaginatedResponse<ExternalDataStripeSource> | null,
Expand All @@ -20,6 +25,25 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
},
],
}),
reducers({
sourceReloadingById: [
{} as Record<string, boolean>,
{
reloadSource: (state, { source }) => ({
...state,
[source.id]: true,
}),
deleteSource: (state, { source }) => ({
...state,
[source.id]: true,
}),
loadingFinished: (state, { source }) => ({
...state,
[source.id]: false,
}),
},
],
}),
selectors({
breadcrumbs: [
() => [],
Expand All @@ -35,6 +59,18 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
],
],
}),
listeners(({ actions }) => ({
deleteSource: async ({ source }) => {
await api.externalDataSources.delete(source.id)
actions.loadSources()
actions.loadingFinished(source)
},
reloadSource: async ({ source }) => {
await api.externalDataSources.reload(source.id)
actions.loadSources()
actions.loadingFinished(source)
},
})),
afterMount(({ actions }) => {
actions.loadSources()
}),
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0359_team_external_data_workspace_id
posthog: 0360_externaldatasource_destination_id
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
17 changes: 17 additions & 0 deletions posthog/migrations/0360_externaldatasource_destination_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 3.2.19 on 2023-11-03 18:40

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0359_team_external_data_workspace_id"),
]

operations = [
migrations.AddField(
model_name="externaldatasource",
name="destination_id",
field=models.CharField(blank=True, max_length=400, null=True),
),
]
1 change: 1 addition & 0 deletions posthog/settings/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
AIRBYTE_BUCKET_KEY = os.getenv("AIRBYTE_BUCKET_KEY", None)
AIRBYTE_BUCKET_SECRET = os.getenv("AIRBYTE_BUCKET_SECRET", None)
AIRBYTE_BUCKET_DOMAIN = os.getenv("AIRBYTE_BUCKET_DOMAIN", None)
AIRBYTE_BUCKET_NAME = os.getenv("AIRBYTE_BUCKET_NAME", None)
33 changes: 33 additions & 0 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
from posthog.warehouse.external_data_source.source import StripeSourcePayload, create_stripe_source, delete_source
from posthog.warehouse.external_data_source.connection import create_connection, start_sync
from posthog.warehouse.external_data_source.destination import create_destination, delete_destination
from posthog.warehouse.sync_resource import sync_resource
from posthog.api.routing import StructuredViewSetMixin
from rest_framework.decorators import action

from posthog.models import User
from typing import Any
import structlog

logger = structlog.get_logger(__name__)


class ExternalDataSourceSerializers(serializers.ModelSerializer):
Expand Down Expand Up @@ -75,6 +80,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
ExternalDataSource.objects.create(
source_id=new_source.source_id,
connection_id=new_connection.connection_id,
destination_id=new_destination.destination_id,
team=self.team,
status="running",
source_type="Stripe",
Expand All @@ -83,3 +89,30 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
start_sync(new_connection.connection_id)

return Response(status=status.HTTP_201_CREATED, data={"source_id": new_source.source_id})

def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response:
instance = self.get_object()

try:
delete_source(instance.source_id)
except Exception as e:
logger.exception(
f"Data Warehouse: Failed to delete source with id: {instance.source_id}",
exc_info=e,
)

try:
delete_destination(instance.destination_id)
except Exception as e:
logger.exception(
f"Data Warehouse: Failed to delete destination with id: {instance.destination_id}",
exc_info=e,
)

return super().destroy(request, *args, **kwargs)

@action(methods=["POST"], detail=True)
def reload(self, request: Request, *args: Any, **kwargs: Any):
instance = self.get_object()
sync_resource(instance.id)
return Response(status=status.HTTP_200_OK)
1 change: 1 addition & 0 deletions posthog/warehouse/external_data_source/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def send_request(path, method, params=None, payload=None):
response = requests.patch(path, json=payload, headers=headers)
elif method == "DELETE":
response = requests.delete(path, headers=headers)
return
else:
raise ValueError(f"Invalid method: {method}")

Expand Down
3 changes: 2 additions & 1 deletion posthog/warehouse/external_data_source/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def start_sync(connection_id: str):
send_request(AIRBYTE_JOBS_URL, method="POST", payload=payload)
except Exception as e:
logger.exception(
f"Sync Resource failed with an unexpected exception for connection id: {connection_id}", exc_info=e
f"Data Warehouse: Sync Resource failed with an unexpected exception for connection id: {connection_id}",
exc_info=e,
)


Expand Down
2 changes: 1 addition & 1 deletion posthog/warehouse/external_data_source/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def create_destination(team_id: int, workspace_id: str) -> ExternalDataDestinati
"s3_bucket_region": settings.AIRBYTE_BUCKET_REGION,
"access_key_id": settings.AIRBYTE_BUCKET_KEY,
"secret_access_key": settings.AIRBYTE_BUCKET_SECRET,
"s3_bucket_name": "databeach-hackathon",
"s3_bucket_name": settings.AIRBYTE_BUCKET_NAME,
"s3_bucket_path": f"airbyte/{team_id}",
},
"name": f"S3/{team_id}",
Expand Down
8 changes: 7 additions & 1 deletion posthog/warehouse/external_data_source/workspace.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from posthog.models import Team
from posthog.warehouse.external_data_source.client import send_request
from django.conf import settings

AIRBYTE_WORKSPACE_URL = "https://api.airbyte.com/v1/workspaces"


def create_workspace(team_id: int):
payload = {"name": "Team " + str(team_id)}
if settings.DEBUG or settings.TEST:
workspace_name = "Team " + str(team_id) + " (TEST)"
else:
workspace_name = "Team " + str(team_id)

payload = {"name": workspace_name}
response = send_request(AIRBYTE_WORKSPACE_URL, method="POST", payload=payload)

return response["workspaceId"]
Expand Down
1 change: 1 addition & 0 deletions posthog/warehouse/models/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Type(models.TextChoices):

source_id: models.CharField = models.CharField(max_length=400)
connection_id: models.CharField = models.CharField(max_length=400)
destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True)
team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE)
status: models.CharField = models.CharField(max_length=400)
source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices)
Expand Down
14 changes: 9 additions & 5 deletions posthog/warehouse/sync_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ def sync_resources():
resources = ExternalDataSource.objects.filter(are_tables_created=False, status__in=["running", "error"])

for resource in resources:
_sync_resource.delay(resource.pk)
sync_resource.delay(resource.pk)


@app.task(ignore_result=True)
def _sync_resource(resource_id):
def sync_resource(resource_id):
resource = ExternalDataSource.objects.get(pk=resource_id)

try:
job = retrieve_sync(resource.connection_id)
except Exception as e:
logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
logger.exception("Data Warehouse: Sync Resource failed with an unexpected exception.", exc_info=e)
resource.status = "error"
resource.save()
return

if job is None:
logger.error(f"No jobs found for connection: {resource.connection_id}")
logger.error(f"Data Warehouse: No jobs found for connection: {resource.connection_id}")
resource.status = "error"
resource.save()
return

if job["status"] == "succeeded":
resource = ExternalDataSource.objects.get(pk=resource_id)
Expand All @@ -53,7 +54,10 @@ def _sync_resource(resource_id):
try:
table.columns = table.get_columns()
except Exception as e:
logger.exception("Sync Resource failed with an unexpected exception.", exc_info=e)
logger.exception(
f"Data Warehouse: Sync Resource failed with an unexpected exception for connection: {resource.connection_id}",
exc_info=e,
)
else:
table.save()

Expand Down

0 comments on commit fd61058

Please sign in to comment.