Skip to content

Commit

Permalink
tested
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE committed Dec 21, 2023
1 parent 9c6647a commit 7118e64
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { actions, afterMount, kea, listeners, path, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import api, { PaginatedResponse } from 'lib/api'
import { lemonToast } from 'lib/lemon-ui/LemonToast/LemonToast'
import { Scene } from 'scenes/sceneTypes'
import { urls } from 'scenes/urls'

Expand Down Expand Up @@ -80,8 +81,16 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
actions.loadingFinished(source)
},
reloadSource: async ({ source }) => {
await api.externalDataSources.reload(source.id)
actions.loadSources()
try {
await api.externalDataSources.reload(source.id)
actions.loadSources()
} catch (e: any) {
if (e.message) {
lemonToast.error(e.message)
} else {
lemonToast.error('Cant refresh source at this time')
}
}
actions.loadingFinished(source)
},
updateSchema: async ({ schema }) => {
Expand Down
6 changes: 3 additions & 3 deletions posthog/tasks/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ def _traverse_jobs_by_field(


def check_synced_row_limits() -> None:
teams = ExternalDataSource.objects.values_list("team", flat=True)
for team in teams:
check_synced_row_limits_of_team.delay(team.pk)
team_ids = ExternalDataSource.objects.values_list("team", flat=True)
for team_id in team_ids:
check_synced_row_limits_of_team.delay(team_id)


@app.task(ignore_result=True)
Expand Down
14 changes: 9 additions & 5 deletions posthog/temporal/data_imports/pipelines/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from posthog.warehouse.models import ExternalDataJob
from django.db.models import F

CHUNK_SIZE = 10000
CHUNK_SIZE = 10_000


def limit_paginated_generator(f):
Expand All @@ -21,14 +22,17 @@ def wrapped(**kwargs):
count = 0
for item in gen:
if count >= CHUNK_SIZE:
model.rows_synced += count
model.save()
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)
count = 0

model.refresh_from_db()

if model.status == ExternalDataJob.Status.CANCELLED:
break

yield item
count += len(item)

model.rows_synced += count
model.save()
ExternalDataJob.objects.filter(id=job_id, team_id=team_id).update(rows_synced=F("rows_synced") + count)

return wrapped
18 changes: 16 additions & 2 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
)
import temporalio

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -147,7 +148,11 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
source=new_source_model,
)

sync_external_data_job_workflow(new_source_model, create=True)
try:
sync_external_data_job_workflow(new_source_model, create=True)
except Exception as e:
# Log error but don't fail because the source model was already created
logger.exception("Could not trigger external data job", exc_info=e)

return Response(status=status.HTTP_201_CREATED, data={"id": new_source_model.pk})

Expand Down Expand Up @@ -199,7 +204,16 @@ def reload(self, request: Request, *args: Any, **kwargs: Any):
data={"message": "Monthly sync limit reached. Please contact PostHog support to increase your limit."},
)

trigger_external_data_workflow(instance)
try:
trigger_external_data_workflow(instance)

except temporalio.service.RPCError as e:
# schedule doesn't exist
if e.message == "sql: no rows in result set":
sync_external_data_job_workflow(instance, create=True)
except Exception as e:
logger.exception("Could not trigger external data job", exc_info=e)
raise

instance.status = "Running"
instance.save()
Expand Down
10 changes: 5 additions & 5 deletions posthog/warehouse/models/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ class Type(models.TextChoices):
STRIPE = "Stripe", "Stripe"

class Status(models.TextChoices):
RUNNING = "running", "Running"
PAUSED = "paused", "Paused"
ERROR = "error", "Error"
COMPLETED = "completed", "Completed"
CANCELLED = "cancelled", "Cancelled"
RUNNING = "Running", "Running"
PAUSED = "Paused", "Paused"
ERROR = "Error", "Error"
COMPLETED = "Completed", "Completed"
CANCELLED = "Cancelled", "Cancelled"

source_id: models.CharField = models.CharField(max_length=400)
connection_id: models.CharField = models.CharField(max_length=400)
Expand Down

0 comments on commit 7118e64

Please sign in to comment.