From 7118e64edad40288ae34bff09876f3ba7147b674 Mon Sep 17 00:00:00 2001 From: eric Date: Thu, 21 Dec 2023 15:26:21 -0500 Subject: [PATCH] tested --- .../settings/dataWarehouseSettingsLogic.ts | 13 +++++++++++-- posthog/tasks/warehouse.py | 6 +++--- .../temporal/data_imports/pipelines/helpers.py | 14 +++++++++----- posthog/warehouse/api/external_data_source.py | 18 ++++++++++++++++-- .../warehouse/models/external_data_source.py | 10 +++++----- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts index c39b308145fe1..439cf8d14c7d3 100644 --- a/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts +++ b/frontend/src/scenes/data-warehouse/settings/dataWarehouseSettingsLogic.ts @@ -1,6 +1,7 @@ import { actions, afterMount, kea, listeners, path, reducers, selectors } from 'kea' import { loaders } from 'kea-loaders' import api, { PaginatedResponse } from 'lib/api' +import { lemonToast } from 'lib/lemon-ui/LemonToast/LemonToast' import { Scene } from 'scenes/sceneTypes' import { urls } from 'scenes/urls' @@ -80,8 +81,16 @@ export const dataWarehouseSettingsLogic = kea([ actions.loadingFinished(source) }, reloadSource: async ({ source }) => { - await api.externalDataSources.reload(source.id) - actions.loadSources() + try { + await api.externalDataSources.reload(source.id) + actions.loadSources() + } catch (e: any) { + if (e.message) { + lemonToast.error(e.message) + } else { + lemonToast.error('Cant refresh source at this time') + } + } actions.loadingFinished(source) }, updateSchema: async ({ schema }) => { diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index 29e0804d4c428..fc00a3f9be0cb 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -175,9 +175,9 @@ def _traverse_jobs_by_field( def check_synced_row_limits() -> None: - teams = ExternalDataSource.objects.values_list("team", flat=True) - for team in teams: - check_synced_row_limits_of_team.delay(team.pk) + team_ids = ExternalDataSource.objects.values_list("team", flat=True) + for team_id in team_ids: + check_synced_row_limits_of_team.delay(team_id) @app.task(ignore_result=True) diff --git a/posthog/temporal/data_imports/pipelines/helpers.py b/posthog/temporal/data_imports/pipelines/helpers.py index d2100a7498dd7..753cce2ea9cb4 100644 --- a/posthog/temporal/data_imports/pipelines/helpers.py +++ b/posthog/temporal/data_imports/pipelines/helpers.py @@ -1,6 +1,7 @@ from posthog.warehouse.models import ExternalDataJob +from django.db.models import F -CHUNK_SIZE = 10000 +CHUNK_SIZE = 10_000 def limit_paginated_generator(f): @@ -21,14 +22,17 @@ def wrapped(**kwargs): count = 0 for item in gen: if count >= CHUNK_SIZE: - model.rows_synced += count - model.save() + ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count) count = 0 + model.refresh_from_db() + + if model.status == ExternalDataJob.Status.CANCELLED: + break + yield item count += len(item) - model.rows_synced += count - model.save() + ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count) return wrapped diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index dc60aec3c0034..48f8babed4a5a 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -25,6 +25,7 @@ from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, ) +import temporalio logger = structlog.get_logger(__name__) @@ -147,7 +148,11 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: source=new_source_model, ) - sync_external_data_job_workflow(new_source_model, create=True) + try: + sync_external_data_job_workflow(new_source_model, create=True) + except Exception as e: + # Log error but don't fail because the source model was already created + logger.exception("Could not trigger external data job", exc_info=e) return Response(status=status.HTTP_201_CREATED, data={"id": new_source_model.pk}) @@ -199,7 +204,16 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): data={"message": "Monthly sync limit reached. Please contact PostHog support to increase your limit."}, ) - trigger_external_data_workflow(instance) + try: + trigger_external_data_workflow(instance) + + except temporalio.service.RPCError as e: + # schedule doesn't exist + if e.message == "sql: no rows in result set": + sync_external_data_job_workflow(instance, create=True) + except Exception as e: + logger.exception("Could not trigger external data job", exc_info=e) + raise instance.status = "Running" instance.save() diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 78b4be2c555f4..287a4a3f2cd99 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -10,11 +10,11 @@ class Type(models.TextChoices): STRIPE = "Stripe", "Stripe" class Status(models.TextChoices): - RUNNING = "running", "Running" - PAUSED = "paused", "Paused" - ERROR = "error", "Error" - COMPLETED = "completed", "Completed" - CANCELLED = "cancelled", "Cancelled" + RUNNING = "Running", "Running" + PAUSED = "Paused", "Paused" + ERROR = "Error", "Error" + COMPLETED = "Completed", "Completed" + CANCELLED = "Cancelled", "Cancelled" source_id: models.CharField = models.CharField(max_length=400) connection_id: models.CharField = models.CharField(max_length=400)