Skip to content

Commit

Permalink
Merge branch 'master' into dw-use-limiting-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
EDsCODE authored Jul 23, 2024
2 parents dfd6b6e + 9ce612f commit 99ee056
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 20 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 9 additions & 1 deletion frontend/src/lib/lemon-ui/LemonDialog/LemonDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,15 @@ export const LemonFormDialog = ({
}, [])

return (
<Form logic={lemonDialogLogic} formKey="form">
<Form
logic={lemonDialogLogic}
formKey="form"
onKeyDown={(e: React.KeyboardEvent<HTMLFormElement>): void => {
if (e.key === 'Enter' && primaryButton?.htmlType === 'submit' && isFormValid) {
void onSubmit(form)
}
}}
>
<LemonDialog {...props} primaryButton={primaryButton} secondaryButton={secondaryButton} />
</Form>
)
Expand Down
29 changes: 27 additions & 2 deletions frontend/src/scenes/insights/insightLogic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const Insight42 = '42' as InsightShortId
const Insight43 = '43' as InsightShortId
const Insight44 = '44' as InsightShortId

const MOCK_DASHBOARD_ID = 34

const partialInsight43 = {
id: 43,
short_id: Insight43,
Expand All @@ -61,6 +63,7 @@ const patchResponseFor = (
description: id === '42' ? undefined : 'Lorem ipsum.',
tags: id === '42' ? undefined : ['good'],
dashboards: payload['dashboards'],
dashboard_tiles: id === '43' ? [{ dashboard_id: MOCK_DASHBOARD_ID }] : undefined,
}
}

Expand Down Expand Up @@ -192,6 +195,23 @@ describe('insightLogic', () => {
},
],
},
'/api/projects/:team/dashboards/34/': {
id: 33,
filters: {},
tiles: [
{
layouts: {},
color: null,
insight: {
id: 42,
short_id: Insight43,
result: 'result!',
filters: { insight: InsightType.TRENDS, interval: 'month' },
tags: ['bla'],
},
},
],
},
},
post: {
'/api/projects/:team/insights/funnel/': { result: ['result from api'] },
Expand Down Expand Up @@ -513,14 +533,19 @@ describe('insightLogic', () => {
})

test('saveInsight updates dashboards', async () => {
const dashLogic = dashboardLogic({ id: MOCK_DASHBOARD_ID })
dashLogic.mount()
await expectLogic(dashLogic).toDispatchActions(['loadDashboard'])

savedInsightsLogic.mount()

logic = insightLogic({
dashboardItemId: Insight43,
})
logic.mount()

logic.actions.saveInsight()
await expectLogic(dashboardsModel).toDispatchActions(['updateDashboardInsight'])

await expectLogic(dashLogic).toDispatchActions(['loadDashboard'])
})

test('updateInsight updates dashboards', async () => {
Expand Down
11 changes: 11 additions & 0 deletions frontend/src/scenes/insights/insightLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { lemonToast } from 'lib/lemon-ui/LemonToast/LemonToast'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { objectsEqual } from 'lib/utils'
import { eventUsageLogic, InsightEventSource } from 'lib/utils/eventUsageLogic'
import { dashboardLogic } from 'scenes/dashboard/dashboardLogic'
import { insightSceneLogic } from 'scenes/insights/insightSceneLogic'
import { keyForInsightLogicProps } from 'scenes/insights/sharedUtils'
import { summarizeInsight } from 'scenes/insights/summarizeInsight'
Expand Down Expand Up @@ -425,6 +426,16 @@ export const insightLogic = kea<insightLogicType>([

dashboardsModel.actions.updateDashboardInsight(savedInsight)

// reload dashboards with updated insight
// since filters on dashboard might be different from filters on insight
// we need to trigger dashboard reload to pick up results for updated insight
savedInsight.dashboard_tiles?.forEach(({ dashboard_id }) =>
dashboardLogic.findMounted({ id: dashboard_id })?.actions.loadDashboard({
action: 'update',
refresh: 'lazy_async',
})
)

const mountedInsightSceneLogic = insightSceneLogic.findMounted()
if (redirectToViewMode) {
if (!insightNumericId && dashboards?.length === 1) {
Expand Down
13 changes: 13 additions & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
ExternalDataSource,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.external_data_schema import aupdate_should_sync


Non_Retryable_Schema_Errors = [
"NoSuchTableError",
"401 Client Error: Unauthorized for url: https://api.stripe.com",
"403 Client Error: Forbidden for url: https://api.stripe.com",
]


@dataclasses.dataclass
Expand All @@ -54,6 +62,11 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu
f"External data job failed for external data schema {inputs.schema_id} with error: {inputs.internal_error}"
)

has_non_retryable_error = any(error in inputs.internal_error for error in Non_Retryable_Schema_Errors)
if has_non_retryable_error:
logger.info("Schema has a non-retryable error - turning off syncing")
await aupdate_should_sync(schema_id=inputs.schema_id, team_id=inputs.team_id, should_sync=False)

await sync_to_async(update_external_job_status)(
run_id=uuid.UUID(inputs.id),
status=inputs.status,
Expand Down
10 changes: 5 additions & 5 deletions posthog/temporal/data_imports/pipelines/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from urllib.parse import quote

from posthog.warehouse.types import IncrementalFieldType
from sqlalchemy.sql import text

from .helpers import (
table_rows,
Expand Down Expand Up @@ -150,14 +151,13 @@ def sql_database(

def get_column_hints(engine: Engine, schema_name: str, table_name: str) -> dict[str, TColumnSchema]:
with engine.connect() as conn:
execute_result: CursorResult | None = conn.execute(
"SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = %(schema_name)s AND table_name = %(table_name)s",
execute_result: CursorResult = conn.execute(
text(
"SELECT column_name, data_type, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = :schema_name AND table_name = :table_name"
),
{"schema_name": schema_name, "table_name": table_name},
)

if execute_result is None:
return {}

cursor_result = cast(CursorResult, execute_result)
results = cursor_result.fetchall()

Expand Down
93 changes: 93 additions & 0 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,99 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs
assert schema.status == ExternalDataJob.Status.COMPLETED


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_update_external_job_activity_with_retryable_error(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)

new_job = await sync_to_async(create_external_data_job)(
team_id=team.id,
external_data_source_id=new_source.pk,
workflow_id=activity_environment.info.workflow_id,
workflow_run_id=activity_environment.info.workflow_run_id,
external_data_schema_id=schema.id,
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="Some other retryable error",
schema_id=str(schema.pk),
team_id=team.id,
)

await activity_environment.run(update_external_data_job_model, inputs)
await sync_to_async(new_job.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert new_job.status == ExternalDataJob.Status.COMPLETED
assert schema.status == ExternalDataJob.Status.COMPLETED
assert schema.should_sync is True


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_update_external_job_activity_with_non_retryable_error(activity_environment, team, **kwargs):
new_source = await sync_to_async(ExternalDataSource.objects.create)(
source_id=uuid.uuid4(),
connection_id=uuid.uuid4(),
destination_id=uuid.uuid4(),
team=team,
status="running",
source_type="Stripe",
)

schema = await sync_to_async(ExternalDataSchema.objects.create)(
name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0],
team_id=team.id,
source_id=new_source.pk,
should_sync=True,
)

new_job = await sync_to_async(create_external_data_job)(
team_id=team.id,
external_data_source_id=new_source.pk,
workflow_id=activity_environment.info.workflow_id,
workflow_run_id=activity_environment.info.workflow_run_id,
external_data_schema_id=schema.id,
)

inputs = UpdateExternalDataJobStatusInputs(
id=str(new_job.id),
run_id=str(new_job.id),
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
internal_error="NoSuchTableError: TableA",
schema_id=str(schema.pk),
team_id=team.id,
)
with mock.patch("posthog.warehouse.models.external_data_schema.external_data_workflow_exists", return_value=False):
await activity_environment.run(update_external_data_job_model, inputs)

await sync_to_async(new_job.refresh_from_db)()
await sync_to_async(schema.refresh_from_db)()

assert new_job.status == ExternalDataJob.Status.COMPLETED
assert schema.status == ExternalDataJob.Status.COMPLETED
assert schema.should_sync is False


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_run_stripe_job(activity_environment, team, minio_client, **kwargs):
Expand Down
30 changes: 18 additions & 12 deletions posthog/warehouse/data_load/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import asdict
from datetime import timedelta
from typing import TYPE_CHECKING

from temporalio.client import (
Schedule,
Expand Down Expand Up @@ -28,18 +29,19 @@
unpause_schedule,
)
from posthog.temporal.utils import ExternalDataWorkflowInputs
from posthog.warehouse.models import ExternalDataSource
import temporalio
from temporalio.client import Client as TemporalClient
from asgiref.sync import async_to_sync

from django.conf import settings
import s3fs

from posthog.warehouse.models.external_data_schema import ExternalDataSchema
if TYPE_CHECKING:
from posthog.warehouse.models import ExternalDataSource
from posthog.warehouse.models.external_data_schema import ExternalDataSchema


def get_sync_schedule(external_data_schema: ExternalDataSchema):
def get_sync_schedule(external_data_schema: "ExternalDataSchema"):
inputs = ExternalDataWorkflowInputs(
team_id=external_data_schema.team_id,
external_data_schema_id=external_data_schema.id,
Expand All @@ -66,7 +68,9 @@ def get_sync_schedule(external_data_schema: ExternalDataSchema):
)


def get_sync_frequency(external_data_schema: ExternalDataSchema):
def get_sync_frequency(external_data_schema: "ExternalDataSchema"):
from posthog.warehouse.models.external_data_schema import ExternalDataSchema

if external_data_schema.sync_frequency == ExternalDataSchema.SyncFrequency.DAILY:
return timedelta(days=1)
elif external_data_schema.sync_frequency == ExternalDataSchema.SyncFrequency.WEEKLY:
Expand All @@ -78,8 +82,8 @@ def get_sync_frequency(external_data_schema: ExternalDataSchema):


def sync_external_data_job_workflow(
external_data_schema: ExternalDataSchema, create: bool = False
) -> ExternalDataSchema:
external_data_schema: "ExternalDataSchema", create: bool = False
) -> "ExternalDataSchema":
temporal = sync_connect()

schedule = get_sync_schedule(external_data_schema)
Expand All @@ -93,8 +97,8 @@ def sync_external_data_job_workflow(


async def a_sync_external_data_job_workflow(
external_data_schema: ExternalDataSchema, create: bool = False
) -> ExternalDataSchema:
external_data_schema: "ExternalDataSchema", create: bool = False
) -> "ExternalDataSchema":
temporal = await async_connect()

schedule = get_sync_schedule(external_data_schema)
Expand All @@ -107,17 +111,17 @@ async def a_sync_external_data_job_workflow(
return external_data_schema


def trigger_external_data_source_workflow(external_data_source: ExternalDataSource):
def trigger_external_data_source_workflow(external_data_source: "ExternalDataSource"):
temporal = sync_connect()
trigger_schedule(temporal, schedule_id=str(external_data_source.id))


def trigger_external_data_workflow(external_data_schema: ExternalDataSchema):
def trigger_external_data_workflow(external_data_schema: "ExternalDataSchema"):
temporal = sync_connect()
trigger_schedule(temporal, schedule_id=str(external_data_schema.id))


async def a_trigger_external_data_workflow(external_data_schema: ExternalDataSchema):
async def a_trigger_external_data_workflow(external_data_schema: "ExternalDataSchema"):
temporal = await async_connect()
await a_trigger_schedule(temporal, schedule_id=str(external_data_schema.id))

Expand Down Expand Up @@ -153,7 +157,7 @@ def delete_external_data_schedule(schedule_id: str):
raise


async def a_delete_external_data_schedule(external_data_source: ExternalDataSource):
async def a_delete_external_data_schedule(external_data_source: "ExternalDataSource"):
temporal = await async_connect()
try:
await a_delete_schedule(temporal, schedule_id=str(external_data_source.id))
Expand Down Expand Up @@ -185,4 +189,6 @@ def delete_data_import_folder(folder_path: str):


def is_any_external_data_job_paused(team_id: int) -> bool:
from posthog.warehouse.models import ExternalDataSource

return ExternalDataSource.objects.filter(team_id=team_id, status=ExternalDataSource.Status.PAUSED).exists()
26 changes: 26 additions & 0 deletions posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from posthog.models.utils import CreatedMetaFields, UUIDModel, sane_repr
import uuid
import psycopg2
from posthog.warehouse.data_load.service import (
external_data_workflow_exists,
pause_external_data_schedule,
sync_external_data_job_workflow,
unpause_external_data_schedule,
)
from posthog.warehouse.types import IncrementalFieldType
from posthog.warehouse.models.ssh_tunnel import SSHTunnel
from posthog.warehouse.util import database_sync_to_async
Expand Down Expand Up @@ -78,6 +84,26 @@ def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None
return ExternalDataSchema.objects.prefetch_related("source").get(id=schema_id, team_id=team_id)


@database_sync_to_async
def aupdate_should_sync(schema_id: str, team_id: int, should_sync: bool) -> ExternalDataSchema | None:
schema = ExternalDataSchema.objects.get(id=schema_id, team_id=team_id)
schema.should_sync = should_sync
schema.save()

schedule_exists = external_data_workflow_exists(schema_id)

if schedule_exists:
if should_sync is False:
pause_external_data_schedule(schema_id)
elif should_sync is True:
unpause_external_data_schedule(schema_id)
else:
if should_sync is True:
sync_external_data_job_workflow(schema, create=True)

return schema


@database_sync_to_async
def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int):
return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all())
Expand Down

0 comments on commit 99ee056

Please sign in to comment.