diff --git a/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx b/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx index 48085faa831ef..3c61d2705ae0e 100644 --- a/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx +++ b/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx @@ -1,6 +1,5 @@ import { LemonButton } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' -import { router } from 'kea-router' import { PageHeader } from 'lib/components/PageHeader' import { FEATURE_FLAGS } from 'lib/constants' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' @@ -10,7 +9,6 @@ import stripeLogo from 'public/stripe-logo.svg' import zendeskLogo from 'public/zendesk-logo.png' import { useCallback } from 'react' import { SceneExport } from 'scenes/sceneTypes' -import { urls } from 'scenes/urls' import { SourceConfig } from '~/types' @@ -26,7 +24,7 @@ export const scene: SceneExport = { } export function NewSourceWizard(): JSX.Element { const { modalTitle, modalCaption } = useValues(sourceWizardLogic) - const { onBack, onSubmit, closeWizard, cancelWizard } = useActions(sourceWizardLogic) + const { onBack, onSubmit, closeWizard } = useActions(sourceWizardLogic) const { currentStep, isLoading, canGoBack, canGoNext, nextButtonText, showSkipButton } = useValues(sourceWizardLogic) @@ -65,17 +63,17 @@ export function NewSourceWizard(): JSX.Element { ) }, [currentStep, isLoading, canGoNext, canGoBack, nextButtonText, showSkipButton]) - const onCancel = (): void => { - cancelWizard() - router.actions.push(urls.dataWarehouse()) - } - return ( <> - + Cancel diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index 7aa8813f57450..f41a90e086fb7 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -424,6 +424,7 @@ export const sourceWizardLogic = kea([ } }, closeWizard: () => { + actions.onClear() actions.clearSource() actions.loadSources(null) router.actions.push(urls.dataWarehouseSettings()) diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 9d92bebd3cb5c..3909765119557 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0401_experiment_exposure_cohort +posthog: 0402_externaldatajob_schema sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/mypy-baseline.txt b/mypy-baseline.txt index f167c5bfe002f..713cf17200a43 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment] -posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment] posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant @@ -134,6 +133,7 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict ent posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/feature_flag.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "email" [union-attr] +posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment] posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment] posthog/api/utils.py:0: error: Argument 1 to "UUID" has incompatible type "int | str"; expected "str | None" [arg-type] ee/billing/quota_limiting.py:0: error: List comprehension has incompatible type List[int]; expected List[str] [misc] @@ -167,13 +167,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item " posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr] ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item] ee/billing/billing_manager.py:0: note: Did you mean "available_features"? -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override] posthog/hogql/resolver.py:0: note: This violates the Liskov substitution principle posthog/hogql/resolver.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides @@ -233,9 +226,6 @@ posthog/hogql/resolver.py:0: error: Argument 1 to "get_child" of "Type" has inco posthog/hogql/resolver.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Alias") [assignment] posthog/hogql/resolver.py:0: error: Argument "alias" to "Alias" has incompatible type "str | int"; expected "str" [arg-type] posthog/hogql/resolver.py:0: error: Argument 1 to "join" of "str" has incompatible type "list[str | int]"; expected "Iterable[str]" [arg-type] -posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type] -posthog/temporal/data_imports/external_data_job.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type] posthog/hogql/transforms/lazy_tables.py:0: error: Incompatible default for argument "context" (default has type "None", argument has type "HogQLContext") [assignment] posthog/hogql/transforms/lazy_tables.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True posthog/hogql/transforms/lazy_tables.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase @@ -577,6 +567,15 @@ posthog/hogql/database/schema/event_sessions.py:0: error: Statement is unreachab posthog/api/organization_member.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] ee/api/role.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] ee/clickhouse/views/insights.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] +posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Argument 6 has incompatible type "ExternalDataSchema"; expected "str" [arg-type] +posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined] posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index] posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined] diff --git a/posthog/migrations/0402_externaldatajob_schema.py b/posthog/migrations/0402_externaldatajob_schema.py new file mode 100644 index 0000000000000..93cd399aec2aa --- /dev/null +++ b/posthog/migrations/0402_externaldatajob_schema.py @@ -0,0 +1,25 @@ +# Generated by Django 4.1.13 on 2024-04-15 14:32 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0401_experiment_exposure_cohort"), + ] + + operations = [ + migrations.AddField( + model_name="externaldatajob", + name="schema", + field=models.ForeignKey( + blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema" + ), + ), + migrations.AddField( + model_name="externaldataschema", + name="status", + field=models.CharField(max_length=400, null=True, blank=True), + ), + ] diff --git a/posthog/temporal/common/client.py b/posthog/temporal/common/client.py index 5794115c5cd18..87c9eb939f599 100644 --- a/posthog/temporal/common/client.py +++ b/posthog/temporal/common/client.py @@ -49,3 +49,16 @@ async def sync_connect() -> Client: settings.TEMPORAL_CLIENT_KEY, ) return client + + +async def async_connect() -> Client: + """Asynchronous connect to Temporal and return a Client.""" + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + return client diff --git a/posthog/temporal/common/schedule.py b/posthog/temporal/common/schedule.py index 7e5e4dbdb393a..7e2ae334a73bc 100644 --- a/posthog/temporal/common/schedule.py +++ b/posthog/temporal/common/schedule.py @@ -12,6 +12,15 @@ async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger ) +async def a_create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False): + """Async create a Temporal Schedule.""" + return await temporal.create_schedule( + id=id, + schedule=schedule, + trigger_immediately=trigger_immediately, + ) + + @async_to_sync async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: """Update a Temporal Schedule.""" @@ -25,6 +34,18 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: ) +async def a_update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: + """Async update a Temporal Schedule.""" + handle = temporal.get_schedule_handle(id) + + async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: + return ScheduleUpdate(schedule=schedule) + + return await handle.update( + updater=updater, + ) + + @async_to_sync async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: """Unpause a Temporal Schedule.""" @@ -39,6 +60,12 @@ async def delete_schedule(temporal: Client, schedule_id: str) -> None: await handle.delete() +async def a_delete_schedule(temporal: Client, schedule_id: str) -> None: + """Async delete a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.delete() + + @async_to_sync async def describe_schedule(temporal: Client, schedule_id: str): """Describe a Temporal Schedule.""" @@ -55,6 +82,21 @@ async def pause_schedule(temporal: Client, schedule_id: str, note: str | None = @async_to_sync async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: - """Pause a Temporal Schedule.""" + """Trigger a Temporal Schedule.""" handle = temporal.get_schedule_handle(schedule_id) await handle.trigger() + + +async def a_trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: + """Trigger a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.trigger() + + +async def a_schedule_exists(temporal: Client, schedule_id: str) -> bool: + """Check whether a schedule exists.""" + try: + await temporal.get_schedule_handle(schedule_id).describe() + return True + except: + return False diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index e4d5887f22d15..3259e91f002cf 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -1,18 +1,20 @@ from posthog.temporal.data_imports.external_data_job import ( ExternalDataJobWorkflow, - create_external_data_job_model, + create_external_data_job_model_activity, create_source_templates, - run_external_data_job, + import_data_activity, update_external_data_job_model, validate_schema_activity, + check_schedule_activity, ) WORKFLOWS = [ExternalDataJobWorkflow] ACTIVITIES = [ - create_external_data_job_model, + create_external_data_job_model_activity, update_external_data_job_model, - run_external_data_job, + import_data_activity, validate_schema_activity, create_source_templates, + check_schedule_activity, ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 82e48a18e05c6..938ab423a0cbe 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -10,81 +10,32 @@ # TODO: remove dependency from posthog.temporal.batch_exports.base import PostHogWorkflow -from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count -from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING -from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken +from posthog.temporal.utils import ExternalDataWorkflowInputs +from posthog.temporal.data_imports.workflow_activities.create_job_model import ( + CreateExternalDataJobModelActivityInputs, + create_external_data_job_model_activity, +) +from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity +from posthog.warehouse.data_load.service import ( + a_delete_external_data_schedule, + a_external_data_workflow_exists, + a_sync_external_data_job_workflow, + a_trigger_external_data_workflow, +) from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table -from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs from posthog.warehouse.external_data_source.jobs import ( - create_external_data_job, update_external_job_status, ) from posthog.warehouse.models import ( ExternalDataJob, get_active_schemas_for_source_id, - sync_old_schemas_with_new_schemas, ExternalDataSource, - get_external_data_job, + aget_schema_by_id, ) -from posthog.warehouse.models.external_data_schema import get_postgres_schemas from posthog.temporal.common.logger import bind_temporal_worker_logger -from posthog.utils import get_instance_region -from typing import Dict, Tuple -import asyncio -from django.conf import settings -from django.utils import timezone - - -@dataclasses.dataclass -class CreateExternalDataJobInputs: - team_id: int - external_data_source_id: uuid.UUID - - -@activity.defn -async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) -> Tuple[str, list[Tuple[str, str]]]: - run = await sync_to_async(create_external_data_job)( - team_id=inputs.team_id, - external_data_source_id=inputs.external_data_source_id, - workflow_id=activity.info().workflow_id, - ) - - source = await sync_to_async(ExternalDataSource.objects.get)( - team_id=inputs.team_id, id=inputs.external_data_source_id - ) - source.status = "Running" - await sync_to_async(source.save)() - - if source.source_type == ExternalDataSource.Type.POSTGRES: - host = source.job_inputs.get("host") - port = source.job_inputs.get("port") - user = source.job_inputs.get("user") - password = source.job_inputs.get("password") - database = source.job_inputs.get("database") - schema = source.job_inputs.get("schema") - schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema) - else: - schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ())) - - await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore - schemas_to_sync, - source_id=inputs.external_data_source_id, - team_id=inputs.team_id, - ) - - schemas = await sync_to_async(get_active_schemas_for_source_id)( - team_id=inputs.team_id, source_id=inputs.external_data_source_id - ) - - logger = await bind_temporal_worker_logger(team_id=inputs.team_id) - - logger.info( - f"Created external data job with for external data source {inputs.external_data_source_id}", - ) - - return str(run.id), schemas +from typing import Dict @dataclasses.dataclass @@ -115,7 +66,7 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu class ValidateSchemaInputs: run_id: str team_id: int - schemas: list[Tuple[str, str]] + schema_id: uuid.UUID table_schema: TSchemaTables table_row_counts: Dict[str, int] @@ -125,7 +76,7 @@ async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None: await validate_schema_and_update_table( run_id=inputs.run_id, team_id=inputs.team_id, - schemas=inputs.schemas, + schema_id=inputs.schema_id, table_schema=inputs.table_schema, table_row_counts=inputs.table_row_counts, ) @@ -147,141 +98,36 @@ async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None: await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id) -@dataclasses.dataclass -class ExternalDataWorkflowInputs: - team_id: int - external_data_source_id: uuid.UUID - - -@dataclasses.dataclass -class ExternalDataJobInputs: - team_id: int - source_id: uuid.UUID - run_id: str - schemas: list[Tuple[str, str]] - - @activity.defn -async def run_external_data_job(inputs: ExternalDataJobInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821 - model: ExternalDataJob = await get_external_data_job( - job_id=inputs.run_id, - ) - +async def check_schedule_activity(inputs: ExternalDataWorkflowInputs) -> bool: logger = await bind_temporal_worker_logger(team_id=inputs.team_id) - job_inputs = PipelineInputs( - source_id=inputs.source_id, - schemas=inputs.schemas, - run_id=inputs.run_id, - team_id=inputs.team_id, - job_type=model.pipeline.source_type, - dataset_name=model.folder_path, - ) - - endpoints = [schema[1] for schema in inputs.schemas] - - source = None - if model.pipeline.source_type == ExternalDataSource.Type.STRIPE: - from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source - - stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None) - account_id = model.pipeline.job_inputs.get("stripe_account_id", None) - # Cludge: account_id should be checked here too but can deal with nulls - # until we require re update of account_ids in stripe so they're all store - if not stripe_secret_key: - raise ValueError(f"Stripe secret key not found for job {model.id}") - - # Hacky just for specific user - region = get_instance_region() - if region == "EU" and inputs.team_id == 11870: - prev_day = timezone.now() - dt.timedelta(days=1) - start_date = prev_day.replace(hour=0, minute=0, second=0, microsecond=0) - end_date = start_date + dt.timedelta(1) - else: - start_date = None - end_date = None - - source = stripe_source( - api_key=stripe_secret_key, - account_id=account_id, - endpoints=tuple(endpoints), - team_id=inputs.team_id, - job_id=inputs.run_id, - start_date=start_date, - end_date=end_date, + # Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule + if inputs.external_data_schema_id is None: + logger.info("Schema ID is none, creating schedules for schemas...") + schemas = await get_active_schemas_for_source_id( + team_id=inputs.team_id, source_id=inputs.external_data_source_id ) - elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: - from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token - from posthog.temporal.data_imports.pipelines.hubspot import hubspot - - hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) - refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) - if not refresh_token: - raise ValueError(f"Hubspot refresh token not found for job {model.id}") - - if not hubspot_access_code: - hubspot_access_code = refresh_access_token(refresh_token) - - source = hubspot( - api_key=hubspot_access_code, - refresh_token=refresh_token, - endpoints=tuple(endpoints), - ) - elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES: - from posthog.temporal.data_imports.pipelines.postgres import postgres_source - - host = model.pipeline.job_inputs.get("host") - port = model.pipeline.job_inputs.get("port") - user = model.pipeline.job_inputs.get("user") - password = model.pipeline.job_inputs.get("password") - database = model.pipeline.job_inputs.get("database") - schema = model.pipeline.job_inputs.get("schema") - - source = postgres_source( - host=host, - port=port, - user=user, - password=password, - database=database, - sslmode="prefer" if settings.TEST or settings.DEBUG else "require", - schema=schema, - table_names=endpoints, - ) - elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: - from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - - credentials = ZendeskCredentialsToken() - credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") - credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") - credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") - - data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) - # Uncomment to support zendesk chat and talk - # data_chat = zendesk_chat() - # data_talk = zendesk_talk() - - source = data_support - else: - raise ValueError(f"Source type {model.pipeline.source_type} not supported") - - # Temp background heartbeat for now - async def heartbeat() -> None: - while True: - await asyncio.sleep(10) - activity.heartbeat() - - heartbeat_task = asyncio.create_task(heartbeat()) + for schema in schemas: + if await a_external_data_workflow_exists(schema.id): + await a_trigger_external_data_workflow(schema) + logger.info(f"Schedule exists for schema {schema.id}. Triggered schedule") + else: + await a_sync_external_data_job_workflow(schema, create=True) + logger.info(f"Created schedule for schema {schema.id}") + # Delete the source schedule in favour of the schema schedules + await a_delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id)) + logger.info(f"Deleted schedule for source {inputs.external_data_source_id}") + return True - try: - table_row_counts = await DataImportPipeline(job_inputs, source, logger).run() - total_rows_synced = sum(table_row_counts.values()) + schema_model = await aget_schema_by_id(inputs.external_data_schema_id, inputs.team_id) - await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced) - finally: - heartbeat_task.cancel() - await asyncio.wait([heartbeat_task]) + # schema turned off so don't sync + if schema_model and not schema_model.should_sync: + return True - return source.schema.tables, table_row_counts + logger.info("Schema ID is set. Continuing...") + return False # TODO: update retry policies @@ -296,14 +142,30 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs: async def run(self, inputs: ExternalDataWorkflowInputs): logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + should_exit = await workflow.execute_activity( + check_schedule_activity, + inputs, + start_to_close_timeout=dt.timedelta(minutes=1), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) + + if should_exit: + return + + assert inputs.external_data_schema_id is not None + # create external data job and trigger activity - create_external_data_job_inputs = CreateExternalDataJobInputs( - team_id=inputs.team_id, - external_data_source_id=inputs.external_data_source_id, + create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs( + team_id=inputs.team_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id ) - run_id, schemas = await workflow.execute_activity( - create_external_data_job_model, + run_id = await workflow.execute_activity( + create_external_data_job_model_activity, create_external_data_job_inputs, start_to_close_timeout=dt.timedelta(minutes=1), retry_policy=RetryPolicy( @@ -319,15 +181,15 @@ async def run(self, inputs: ExternalDataWorkflowInputs): ) try: - job_inputs = ExternalDataJobInputs( - source_id=inputs.external_data_source_id, + job_inputs = ImportDataActivityInputs( team_id=inputs.team_id, run_id=run_id, - schemas=schemas, + schema_id=inputs.external_data_schema_id, + source_id=inputs.external_data_source_id, ) table_schemas, table_row_counts = await workflow.execute_activity( - run_external_data_job, + import_data_activity, job_inputs, start_to_close_timeout=dt.timedelta(hours=30), retry_policy=RetryPolicy(maximum_attempts=5), @@ -338,7 +200,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): validate_inputs = ValidateSchemaInputs( run_id=run_id, team_id=inputs.team_id, - schemas=schemas, + schema_id=inputs.external_data_schema_id, table_schema=table_schemas, table_row_counts=table_row_counts, ) diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 920f3eba88d23..d91ce311808a5 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -17,7 +17,7 @@ class PipelineInputs: source_id: UUID run_id: str - schemas: list[tuple[str, str]] + schema_id: UUID dataset_name: str job_type: str team_id: int @@ -68,13 +68,6 @@ def _create_pipeline(self): dataset_name=self.inputs.dataset_name, ) - def _get_schemas(self): - if not self.inputs.schemas: - self.logger.info(f"No schemas found for source id {self.inputs.source_id}") - return None - - return self.inputs.schemas - def _run(self) -> Dict[str, int]: pipeline = self._create_pipeline() pipeline.run(self.source, loader_file_format=self.loader_file_format) @@ -86,10 +79,6 @@ def _run(self) -> Dict[str, int]: return dict(filtered_rows) async def run(self) -> Dict[str, int]: - schemas = self._get_schemas() - if not schemas: - return {} - try: return await asyncio.to_thread(self._run) except PipelineStepFailed: diff --git a/posthog/temporal/data_imports/pipelines/postgres/helpers.py b/posthog/temporal/data_imports/pipelines/postgres/helpers.py index 7d45a6df7e302..a288205063f15 100644 --- a/posthog/temporal/data_imports/pipelines/postgres/helpers.py +++ b/posthog/temporal/data_imports/pipelines/postgres/helpers.py @@ -117,8 +117,8 @@ class SqlDatabaseTableConfiguration(BaseConfiguration): class SqlTableResourceConfiguration(BaseConfiguration): credentials: ConnectionStringCredentials table: str - incremental: Optional[dlt.sources.incremental] = None schema: Optional[str] + incremental: Optional[dlt.sources.incremental] = None __source_name__ = "sql_database" diff --git a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py index 1f8110ae9b911..e4dfda2013573 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py @@ -13,7 +13,7 @@ class ZendeskCredentialsBase(CredentialsConfiguration): The Base version of all the ZendeskCredential classes. """ - subdomain: str + subdomain: str = "" __config_gen_annotations__: ClassVar[List[str]] = [] @@ -23,7 +23,7 @@ class ZendeskCredentialsEmailPass(ZendeskCredentialsBase): This class is used to store credentials for Email + Password Authentication """ - email: str + email: str = "" password: TSecretValue @@ -42,7 +42,7 @@ class ZendeskCredentialsToken(ZendeskCredentialsBase): This class is used to store credentials for Token Authentication """ - email: str + email: str = "" token: TSecretValue diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py new file mode 100644 index 0000000000000..d03e4173e2de5 --- /dev/null +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -0,0 +1,68 @@ +import dataclasses +import uuid + +from asgiref.sync import sync_to_async +from temporalio import activity + +# TODO: remove dependency +from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING + +from posthog.warehouse.external_data_source.jobs import ( + create_external_data_job, +) +from posthog.warehouse.models import ( + sync_old_schemas_with_new_schemas, + ExternalDataSource, +) +from posthog.warehouse.models.external_data_schema import ExternalDataSchema, get_postgres_schemas +from posthog.temporal.common.logger import bind_temporal_worker_logger + + +@dataclasses.dataclass +class CreateExternalDataJobModelActivityInputs: + team_id: int + schema_id: uuid.UUID + source_id: uuid.UUID + + +@activity.defn +async def create_external_data_job_model_activity(inputs: CreateExternalDataJobModelActivityInputs) -> str: + run = await sync_to_async(create_external_data_job)( + team_id=inputs.team_id, + external_data_source_id=inputs.source_id, + external_data_schema_id=inputs.schema_id, + workflow_id=activity.info().workflow_id, + ) + + schema = await sync_to_async(ExternalDataSchema.objects.get)(team_id=inputs.team_id, id=inputs.schema_id) + schema.status = ExternalDataSchema.Status.RUNNING + await sync_to_async(schema.save)() + + source = await sync_to_async(ExternalDataSource.objects.get)(team_id=inputs.team_id, id=inputs.source_id) + + if source.source_type == ExternalDataSource.Type.POSTGRES: + host = source.job_inputs.get("host") + port = source.job_inputs.get("port") + user = source.job_inputs.get("user") + password = source.job_inputs.get("password") + database = source.job_inputs.get("database") + schema = source.job_inputs.get("schema") + schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema) + else: + schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ())) + + # TODO: this could cause a race condition where each schema worker creates the missing schema + + await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore + schemas_to_sync, + source_id=inputs.source_id, + team_id=inputs.team_id, + ) + + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + logger.info( + f"Created external data job with for external data source {inputs.source_id}", + ) + + return str(run.id) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py new file mode 100644 index 0000000000000..bd2a9d4cf85ba --- /dev/null +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -0,0 +1,159 @@ +import dataclasses +import datetime as dt +import uuid + +from dlt.common.schema.typing import TSchemaTables +from temporalio import activity + +# TODO: remove dependency +from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count +from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken + +from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs +from posthog.utils import get_instance_region +from posthog.warehouse.models import ( + ExternalDataJob, + ExternalDataSource, + get_external_data_job, +) +from posthog.temporal.common.logger import bind_temporal_worker_logger +from typing import Dict, Tuple +import asyncio +from django.conf import settings +from django.utils import timezone + +from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id + + +@dataclasses.dataclass +class ImportDataActivityInputs: + team_id: int + schema_id: uuid.UUID + source_id: uuid.UUID + run_id: str + + +@activity.defn +async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821 + model: ExternalDataJob = await get_external_data_job( + job_id=inputs.run_id, + ) + + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + job_inputs = PipelineInputs( + source_id=inputs.source_id, + schema_id=inputs.schema_id, + run_id=inputs.run_id, + team_id=inputs.team_id, + job_type=model.pipeline.source_type, + dataset_name=model.folder_path, + ) + + schema: ExternalDataSchema = await aget_schema_by_id(inputs.schema_id, inputs.team_id) + + endpoints = [schema.name] + + source = None + if model.pipeline.source_type == ExternalDataSource.Type.STRIPE: + from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source + + stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None) + account_id = model.pipeline.job_inputs.get("stripe_account_id", None) + # Cludge: account_id should be checked here too but can deal with nulls + # until we require re update of account_ids in stripe so they're all store + if not stripe_secret_key: + raise ValueError(f"Stripe secret key not found for job {model.id}") + + # Hacky just for specific user + region = get_instance_region() + if region == "EU" and inputs.team_id == 11870: + prev_day = timezone.now() - dt.timedelta(days=1) + start_date = prev_day.replace(hour=0, minute=0, second=0, microsecond=0) + end_date = start_date + dt.timedelta(1) + else: + start_date = None + end_date = None + + source = stripe_source( + api_key=stripe_secret_key, + account_id=account_id, + endpoints=tuple(endpoints), + team_id=inputs.team_id, + job_id=inputs.run_id, + start_date=start_date, + end_date=end_date, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: + from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token + from posthog.temporal.data_imports.pipelines.hubspot import hubspot + + hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) + refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) + if not refresh_token: + raise ValueError(f"Hubspot refresh token not found for job {model.id}") + + if not hubspot_access_code: + hubspot_access_code = refresh_access_token(refresh_token) + + source = hubspot( + api_key=hubspot_access_code, + refresh_token=refresh_token, + endpoints=tuple(endpoints), + ) + elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES: + from posthog.temporal.data_imports.pipelines.postgres import postgres_source + + host = model.pipeline.job_inputs.get("host") + port = model.pipeline.job_inputs.get("port") + user = model.pipeline.job_inputs.get("user") + password = model.pipeline.job_inputs.get("password") + database = model.pipeline.job_inputs.get("database") + pg_schema = model.pipeline.job_inputs.get("schema") + + source = postgres_source( + host=host, + port=port, + user=user, + password=password, + database=database, + sslmode="prefer" if settings.TEST or settings.DEBUG else "require", + schema=pg_schema, + table_names=endpoints, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: + from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support + + # NOTE: this line errors on CI mypy but not locally. Putting arguments within the function causes the opposite error + credentials = ZendeskCredentialsToken() + credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") + credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") + credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") + + data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) + # Uncomment to support zendesk chat and talk + # data_chat = zendesk_chat() + # data_talk = zendesk_talk() + + source = data_support + else: + raise ValueError(f"Source type {model.pipeline.source_type} not supported") + + # Temp background heartbeat for now + async def heartbeat() -> None: + while True: + await asyncio.sleep(10) + activity.heartbeat() + + heartbeat_task = asyncio.create_task(heartbeat()) + + try: + table_row_counts = await DataImportPipeline(job_inputs, source, logger).run() + total_rows_synced = sum(table_row_counts.values()) + + await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced) + finally: + heartbeat_task.cancel() + await asyncio.wait([heartbeat_task]) + + return source.schema.tables, table_row_counts diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index 7603a1a98d47e..44470724a9c5f 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -6,21 +6,23 @@ from django.test import override_settings from posthog.temporal.data_imports.external_data_job import ( - CreateExternalDataJobInputs, UpdateExternalDataJobStatusInputs, ValidateSchemaInputs, - create_external_data_job, - create_external_data_job_model, + check_schedule_activity, create_source_templates, - run_external_data_job, update_external_data_job_model, validate_schema_activity, ) from posthog.temporal.data_imports.external_data_job import ( ExternalDataJobWorkflow, - ExternalDataJobInputs, ExternalDataWorkflowInputs, ) +from posthog.temporal.data_imports.workflow_activities.create_job_model import ( + CreateExternalDataJobModelActivityInputs, + create_external_data_job_model_activity, +) +from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity +from posthog.warehouse.external_data_source.jobs import create_external_data_job from posthog.warehouse.models import ( get_latest_run_if_exists, DataWarehouseTable, @@ -146,13 +148,16 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs source_type="Stripe", ) - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + test_1_schema = await _create_schema("test-1", new_source, team) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + inputs = CreateExternalDataJobModelActivityInputs( + team_id=team.id, source_id=new_source.pk, schema_id=test_1_schema.id + ) + + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() - assert len(schemas) == 0 @pytest.mark.django_db(transaction=True) @@ -167,26 +172,18 @@ async def test_create_external_job_activity_schemas_exist(activity_environment, source_type="Stripe", ) - await sync_to_async(ExternalDataSchema.objects.create)( + schema = await sync_to_async(ExternalDataSchema.objects.create)( name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], team_id=team.id, source_id=new_source.pk, ) - await sync_to_async(ExternalDataSchema.objects.create)( - name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1], - team_id=team.id, - source_id=new_source.pk, - should_sync=False, - ) - - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() - assert len(schemas) == 1 @pytest.mark.django_db(transaction=True) @@ -201,22 +198,16 @@ async def test_create_external_job_activity_update_schemas(activity_environment, source_type="Stripe", ) - await sync_to_async(ExternalDataSchema.objects.create)( + schema = await sync_to_async(ExternalDataSchema.objects.create)( name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], team_id=team.id, source_id=new_source.pk, should_sync=True, ) - await sync_to_async(ExternalDataSchema.objects.create)( - name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1], - team_id=team.id, - source_id=new_source.pk, - ) - - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() @@ -241,8 +232,18 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs source_type="Stripe", ) + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + new_job = await sync_to_async(create_external_data_job)( - team_id=team.id, external_data_source_id=new_source.pk, workflow_id=activity_environment.info.workflow_id + team_id=team.id, + external_data_source_id=new_source.pk, + workflow_id=activity_environment.info.workflow_id, + external_data_schema_id=schema.id, ) inputs = UpdateExternalDataJobStatusInputs( @@ -255,8 +256,10 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs await activity_environment.run(update_external_data_job_model, inputs) await sync_to_async(new_job.refresh_from_db)() + await sync_to_async(schema.refresh_from_db)() assert new_job.status == ExternalDataJob.Status.COMPLETED + assert schema.status == ExternalDataJob.Status.COMPLETED @pytest.mark.django_db(transaction=True) @@ -283,13 +286,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -314,14 +316,13 @@ async def setup_job_2(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() - customer_schema = await _create_schema("Customer", new_source, team) invoice_schema = await _create_schema("Invoice", new_source, team) - schemas = [(customer_schema.id, "Customer"), (invoice_schema.id, "Invoice")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=invoice_schema.id, ) return new_job, inputs @@ -356,26 +357,18 @@ async def setup_job_2(): "has_more": False, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), - activity_environment.run(run_external_data_job, job_2_inputs), + activity_environment.run(import_data_activity, job_1_inputs), + activity_environment.run(import_data_activity, job_2_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/customer/" ) - job_1_invoice_objects = await minio_client.list_objects_v2( - Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/invoice/" - ) assert len(job_1_customer_objects["Contents"]) == 1 - assert job_1_invoice_objects.get("Contents", None) is None - job_2_customer_objects = await minio_client.list_objects_v2( - Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/customer/" - ) job_2_invoice_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/invoice/" ) - assert len(job_2_customer_objects["Contents"]) == 1 assert len(job_2_invoice_objects["Contents"]) == 1 @@ -405,12 +398,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -432,7 +425,7 @@ async def setup_job_1(): "has_more": True, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( @@ -470,12 +463,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -499,7 +492,7 @@ async def setup_job_1(): "has_more": False, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( @@ -533,17 +526,6 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t ) test_1_schema = await _create_schema("test-1", new_source, team) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -554,21 +536,17 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 ) @@ -618,17 +596,6 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_ ) test_1_schema = await _create_schema("test-1", new_source, team, table_id=existing_table.id) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -639,21 +606,17 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_ ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 ) @@ -699,34 +662,27 @@ async def test_validate_schema_and_update_table_activity_half_run(activity_envir ] broken_schema = await _create_schema("broken_schema", new_source, team) - test_schema = await _create_schema("test_schema", new_source, team) - schemas = [(broken_schema.id, "broken_schema"), (test_schema.id, "test_schema")] await activity_environment.run( validate_schema_activity, ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=broken_schema.id, table_schema={ "broken_schema": { "name": "broken_schema", "resource": "broken_schema", "columns": {"id": {"data_type": "text"}}, }, - "test_schema": { - "name": "test_schema", - "resource": "test_schema", - "columns": {"id": {"data_type": "text"}}, - }, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 1 + assert mock_get_columns.call_count == 0 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 0 ) @@ -751,17 +707,6 @@ async def test_create_schema_activity(activity_environment, team, **kwargs): ) test_1_schema = await _create_schema("test-1", new_source, team) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -772,30 +717,25 @@ async def test_create_schema_activity(activity_environment, team, **kwargs): ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 all_tables = DataWarehouseTable.objects.all() table_length = await sync_to_async(len)(all_tables) - assert table_length == 5 + assert table_length == 1 @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio -async def test_external_data_job_workflow_blank(team, **kwargs): +async def test_external_data_job_workflow_with_schema(team, **kwargs): """ - Test workflow with no schema. - Smoke test for making sure all activities run. + Test workflow with schema. """ new_source = await sync_to_async(ExternalDataSource.objects.create)( source_id=uuid.uuid4(), @@ -807,71 +747,19 @@ async def test_external_data_job_workflow_blank(team, **kwargs): job_inputs={"stripe_secret_key": "test-key"}, ) - workflow_id = str(uuid.uuid4()) - inputs = ExternalDataWorkflowInputs( + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name="Customer", team_id=team.id, - external_data_source_id=new_source.pk, - ) - - with override_settings(AIRBYTE_BUCKET_KEY="test-key", AIRBYTE_BUCKET_SECRET="test-secret"): - with mock.patch.dict(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, {ExternalDataSource.Type.STRIPE: ()}): - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, - workflows=[ExternalDataJobWorkflow], - activities=[ - create_external_data_job_model, - update_external_data_job_model, - run_external_data_job, - validate_schema_activity, - create_source_templates, - ], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - await activity_environment.client.execute_workflow( - ExternalDataJobWorkflow.run, - inputs, - id=workflow_id, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - ) - - run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=new_source.pk) - assert run is not None - assert run.status == ExternalDataJob.Status.COMPLETED - - -@pytest.mark.django_db(transaction=True) -@pytest.mark.asyncio -async def test_external_data_job_workflow_with_schema(team, **kwargs): - """ - Test workflow with schema. - """ - new_source = await sync_to_async(ExternalDataSource.objects.create)( - source_id=uuid.uuid4(), - connection_id=uuid.uuid4(), - destination_id=uuid.uuid4(), - team=team, - status="running", - source_type="Stripe", - job_inputs={"stripe_secret_key": "test-key"}, + source_id=new_source.pk, ) workflow_id = str(uuid.uuid4()) inputs = ExternalDataWorkflowInputs( team_id=team.id, external_data_source_id=new_source.pk, + external_data_schema_id=schema.id, ) - schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] - for schema in schemas: - await sync_to_async(ExternalDataSchema.objects.create)( - name=schema, - team_id=team.id, - source_id=new_source.pk, - ) - async def mock_async_func(inputs): return {} @@ -885,9 +773,10 @@ async def mock_async_func(inputs): task_queue=DATA_WAREHOUSE_TASK_QUEUE, workflows=[ExternalDataJobWorkflow], activities=[ - create_external_data_job_model, + check_schedule_activity, + create_external_data_job_model_activity, update_external_data_job_model, - run_external_data_job, + import_data_activity, validate_schema_activity, create_source_templates, ], @@ -906,9 +795,7 @@ async def mock_async_func(inputs): assert run is not None assert run.status == ExternalDataJob.Status.COMPLETED - assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == len( - PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] - ) + assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 @pytest.mark.django_db(transaction=True) @@ -952,12 +839,9 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() posthog_test_schema = await _create_schema("posthog_test", new_source, team) - schemas = [(posthog_test_schema.id, "posthog_test")] - inputs = ExternalDataJobInputs( - team_id=team.id, - run_id=new_job.pk, - source_id=new_source.pk, - schemas=schemas, + + inputs = ImportDataActivityInputs( + team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, schema_id=posthog_test_schema.id ) return new_job, inputs @@ -970,10 +854,117 @@ async def setup_job_1(): AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, ): await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_team_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/posthog_test/" ) assert len(job_1_team_objects["Contents"]) == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_schema_id(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + test_1_schema = await _create_schema("test-1", new_source, team) + + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=test_1_schema.id, + ), + ) + + assert should_exit is False + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_missing_schema_id_but_with_schedule(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + await sync_to_async(ExternalDataSchema.objects.create)( + name="test-1", + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + + with mock.patch( + "posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_trigger_external_data_workflow" + ) as mock_a_trigger_external_data_workflow: + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=None, + ), + ) + + assert should_exit is True + assert mock_a_trigger_external_data_workflow.call_count == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_missing_schema_id_and_no_schedule(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + await sync_to_async(ExternalDataSchema.objects.create)( + name="test-1", + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + + with mock.patch( + "posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=False + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_sync_external_data_job_workflow" + ) as mock_a_sync_external_data_job_workflow: + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=None, + ), + ) + + assert should_exit is True + assert mock_a_sync_external_data_job_workflow.call_count == 1 diff --git a/posthog/temporal/utils.py b/posthog/temporal/utils.py new file mode 100644 index 0000000000000..fa96af6442489 --- /dev/null +++ b/posthog/temporal/utils.py @@ -0,0 +1,10 @@ +import dataclasses +import uuid + + +# Dataclass living here to avoid circular reference +@dataclasses.dataclass +class ExternalDataWorkflowInputs: + team_id: int + external_data_source_id: uuid.UUID + external_data_schema_id: uuid.UUID | None = None diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index d1c766eb2b2eb..744797bd430b4 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -17,6 +17,7 @@ cancel_external_data_workflow, delete_data_import_folder, is_any_external_data_job_paused, + trigger_external_data_source_workflow, ) from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer @@ -41,6 +42,7 @@ class ExternalDataSourceSerializers(serializers.ModelSerializer): account_id = serializers.CharField(write_only=True) client_secret = serializers.CharField(write_only=True) last_run_at = serializers.SerializerMethodField(read_only=True) + status = serializers.SerializerMethodField(read_only=True) schemas = serializers.SerializerMethodField(read_only=True) class Meta: @@ -68,6 +70,28 @@ def get_last_run_at(self, instance: ExternalDataSource) -> str: return latest_completed_run.created_at if latest_completed_run else None + def get_status(self, instance: ExternalDataSource) -> str: + active_schemas: List[ExternalDataSchema] = list(instance.schemas.filter(should_sync=True).all()) + any_failures = any(schema.status == ExternalDataSchema.Status.ERROR for schema in active_schemas) + any_cancelled = any(schema.status == ExternalDataSchema.Status.CANCELLED for schema in active_schemas) + any_paused = any(schema.status == ExternalDataSchema.Status.PAUSED for schema in active_schemas) + any_running = any(schema.status == ExternalDataSchema.Status.RUNNING for schema in active_schemas) + any_completed = any(schema.status == ExternalDataSchema.Status.COMPLETED for schema in active_schemas) + + if any_failures: + return ExternalDataSchema.Status.ERROR + elif any_cancelled: + return ExternalDataSchema.Status.CANCELLED + elif any_paused: + return ExternalDataSchema.Status.PAUSED + elif any_running: + return ExternalDataSchema.Status.RUNNING + elif any_completed: + return ExternalDataSchema.Status.COMPLETED + else: + # Fallback during migration phase of going from source -> schema as the source of truth for syncs + return instance.status + def get_schemas(self, instance: ExternalDataSource): schemas = instance.schemas.order_by("name").all() return ExternalDataSchemaSerializer(schemas, many=True, read_only=True, context=self.context).data @@ -169,13 +193,20 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: disabled_schemas = [schema for schema in default_schemas if schema not in enabled_schemas] + active_schemas: List[ExternalDataSchema] = [] + for schema in enabled_schemas: - ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=True) + active_schemas.append( + ExternalDataSchema.objects.create( + name=schema, team=self.team, source=new_source_model, should_sync=True + ) + ) for schema in disabled_schemas: ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=False) try: - sync_external_data_job_workflow(new_source_model, create=True) + for active_schema in active_schemas: + sync_external_data_job_workflow(active_schema, create=True) except Exception as e: # Log error but don't fail because the source model was already created logger.exception("Could not trigger external data job", exc_info=e) @@ -331,7 +362,12 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response: ) pass - delete_external_data_schedule(instance) + for schema in ExternalDataSchema.objects.filter( + team_id=self.team_id, source_id=instance.id, should_sync=True + ).all(): + delete_external_data_schedule(str(schema.id)) + + delete_external_data_schedule(str(instance.id)) return super().destroy(request, *args, **kwargs) @action(methods=["POST"], detail=True) @@ -345,12 +381,23 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): ) try: - trigger_external_data_workflow(instance) + trigger_external_data_source_workflow(instance) + + except temporalio.service.RPCError: + # if the source schedule has been removed - trigger the schema schedules + for schema in ExternalDataSchema.objects.filter( + team_id=self.team_id, source_id=instance.id, should_sync=True + ).all(): + try: + trigger_external_data_workflow(schema) + except temporalio.service.RPCError as e: + # schedule doesn't exist + if e.message == "sql: no rows in result set": + sync_external_data_job_workflow(schema, create=True) + + except Exception as e: + logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e) - except temporalio.service.RPCError as e: - # schedule doesn't exist - if e.message == "sql: no rows in result set": - sync_external_data_job_workflow(instance, create=True) except Exception as e: logger.exception("Could not trigger external data job", exc_info=e) raise diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index 957197147d476..31fc1aab6baff 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -12,18 +12,21 @@ ) from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE -from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.client import async_connect, sync_connect from posthog.temporal.common.schedule import ( + a_create_schedule, + a_delete_schedule, + a_trigger_schedule, + a_update_schedule, create_schedule, pause_schedule, + a_schedule_exists, trigger_schedule, update_schedule, delete_schedule, unpause_schedule, ) -from posthog.temporal.data_imports.external_data_job import ( - ExternalDataWorkflowInputs, -) +from posthog.temporal.utils import ExternalDataWorkflowInputs from posthog.warehouse.models import ExternalDataSource import temporalio from temporalio.client import Client as TemporalClient @@ -32,48 +35,86 @@ from django.conf import settings import s3fs +from posthog.warehouse.models.external_data_schema import ExternalDataSchema -def sync_external_data_job_workflow( - external_data_source: ExternalDataSource, create: bool = False -) -> ExternalDataSource: - temporal = sync_connect() + +def get_sync_schedule(external_data_schema: ExternalDataSchema): inputs = ExternalDataWorkflowInputs( - team_id=external_data_source.team.id, - external_data_source_id=external_data_source.pk, + team_id=external_data_schema.team_id, + external_data_schema_id=external_data_schema.id, + external_data_source_id=external_data_schema.source_id, ) - schedule = Schedule( + return Schedule( action=ScheduleActionStartWorkflow( "external-data-job", asdict(inputs), - id=str(external_data_source.pk), + id=str(external_data_schema.id), task_queue=str(DATA_WAREHOUSE_TASK_QUEUE), ), spec=ScheduleSpec( intervals=[ ScheduleIntervalSpec( - every=timedelta(hours=24), offset=timedelta(hours=external_data_source.created_at.hour) + every=timedelta(hours=24), offset=timedelta(hours=external_data_schema.created_at.hour) ) ], jitter=timedelta(hours=2), ), - state=ScheduleState(note=f"Schedule for external data source: {external_data_source.pk}"), + state=ScheduleState(note=f"Schedule for external data source: {external_data_schema.pk}"), policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.SKIP), ) + +def sync_external_data_job_workflow( + external_data_schema: ExternalDataSchema, create: bool = False +) -> ExternalDataSchema: + temporal = sync_connect() + + schedule = get_sync_schedule(external_data_schema) + + if create: + create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True) + else: + update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule) + + return external_data_schema + + +async def a_sync_external_data_job_workflow( + external_data_schema: ExternalDataSchema, create: bool = False +) -> ExternalDataSchema: + temporal = await async_connect() + + schedule = get_sync_schedule(external_data_schema) + if create: - create_schedule(temporal, id=str(external_data_source.id), schedule=schedule, trigger_immediately=True) + await a_create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True) else: - update_schedule(temporal, id=str(external_data_source.id), schedule=schedule) + await a_update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule) - return external_data_source + return external_data_schema -def trigger_external_data_workflow(external_data_source: ExternalDataSource): +def trigger_external_data_source_workflow(external_data_source: ExternalDataSource): temporal = sync_connect() trigger_schedule(temporal, schedule_id=str(external_data_source.id)) +def trigger_external_data_workflow(external_data_schema: ExternalDataSchema): + temporal = sync_connect() + trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) + + +async def a_trigger_external_data_workflow(external_data_schema: ExternalDataSchema): + temporal = await async_connect() + await a_trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) + + +async def a_external_data_workflow_exists(id: str) -> bool: + temporal = await async_connect() + return await a_schedule_exists(temporal, schedule_id=id) + + def pause_external_data_schedule(external_data_source: ExternalDataSource): temporal = sync_connect() pause_schedule(temporal, schedule_id=str(external_data_source.id)) @@ -84,10 +125,21 @@ def unpause_external_data_schedule(external_data_source: ExternalDataSource): unpause_schedule(temporal, schedule_id=str(external_data_source.id)) -def delete_external_data_schedule(external_data_source: ExternalDataSource): +def delete_external_data_schedule(schedule_id: str): temporal = sync_connect() try: - delete_schedule(temporal, schedule_id=str(external_data_source.id)) + delete_schedule(temporal, schedule_id=schedule_id) + except temporalio.service.RPCError as e: + # Swallow error if schedule does not exist already + if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: + return + raise + + +async def a_delete_external_data_schedule(external_data_source: ExternalDataSource): + temporal = await async_connect() + try: + await a_delete_schedule(temporal, schedule_id=str(external_data_source.id)) except temporalio.service.RPCError as e: # Swallow error if schedule does not exist already if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 44190a42d3d2e..dcfbb69595aa3 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -1,3 +1,4 @@ +import uuid from django.conf import settings from dlt.common.schema.typing import TSchemaTables from dlt.common.data_types.typing import TDataType @@ -26,8 +27,9 @@ from posthog.temporal.common.logger import bind_temporal_worker_logger from clickhouse_driver.errors import ServerException from asgiref.sync import sync_to_async -from typing import Dict, Tuple, Type +from typing import Dict, Type from posthog.utils import camel_to_snake_case +from posthog.warehouse.models.external_data_schema import ExternalDataSchema def dlt_to_hogql_type(dlt_type: TDataType | None) -> str: @@ -91,7 +93,7 @@ async def validate_schema( async def validate_schema_and_update_table( run_id: str, team_id: int, - schemas: list[Tuple[str, str]], + schema_id: uuid.UUID, table_schema: TSchemaTables, table_row_counts: Dict[str, int], ) -> None: @@ -103,51 +105,40 @@ async def validate_schema_and_update_table( Arguments: run_id: The id of the external data job team_id: The id of the team - schemas: The list of schemas that have been synced by the external data job + schema_id: The schema for which the data job relates to + table_schema: The DLT schema from the data load stage + table_row_counts: The count of synced rows from DLT """ logger = await bind_temporal_worker_logger(team_id=team_id) job: ExternalDataJob = await get_external_data_job(job_id=run_id) - last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(job.team_id, job.pipeline_id) + last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(team_id, job.pipeline_id) credential: DataWarehouseCredential = await get_or_create_datawarehouse_credential( - team_id=job.team_id, + team_id=team_id, access_key=settings.AIRBYTE_BUCKET_KEY, access_secret=settings.AIRBYTE_BUCKET_SECRET, ) - for _schema in schemas: - _schema_id = _schema[0] - _schema_name = _schema[1] + external_data_schema: ExternalDataSchema = await aget_schema_by_id(schema_id, team_id) - table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() - new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name)) - row_count = table_row_counts.get(_schema_name, 0) + _schema_id = external_data_schema.id + _schema_name: str = external_data_schema.name - # Check - try: - data = await validate_schema( - credential=credential, - table_name=table_name, - new_url_pattern=new_url_pattern, - team_id=team_id, - row_count=row_count, - ) - except ServerException as err: - if err.code == 636: - logger.exception( - f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}", - exc_info=err, - ) - continue - except Exception as e: - # TODO: handle other exceptions here - logger.exception( - f"Data Warehouse: Could not validate schema for external data job {job.pk}", - exc_info=e, - ) - continue + table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() + new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name)) + row_count = table_row_counts.get(_schema_name.lower(), 0) + + # Check + try: + data = await validate_schema( + credential=credential, + table_name=table_name, + new_url_pattern=new_url_pattern, + team_id=team_id, + row_count=row_count, + ) # create or update table_created = None @@ -190,13 +181,26 @@ async def validate_schema_and_update_table( await asave_datawarehousetable(table_created) # schema could have been deleted by this point - schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=job.team_id) + schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=team_id) if schema_model: schema_model.table = table_created schema_model.last_synced_at = job.created_at await asave_external_data_schema(schema_model) + except ServerException as err: + if err.code == 636: + logger.exception( + f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}", + exc_info=err, + ) + except Exception as e: + # TODO: handle other exceptions here + logger.exception( + f"Data Warehouse: Could not validate schema for external data job {job.pk}", + exc_info=e, + ) + if last_successful_job: try: last_successful_job.delete_data_in_bucket() @@ -205,4 +209,3 @@ async def validate_schema_and_update_table( f"Data Warehouse: Could not delete deprecated data source {last_successful_job.pk}", exc_info=e, ) - pass diff --git a/posthog/warehouse/external_data_source/jobs.py b/posthog/warehouse/external_data_source/jobs.py index 7370615e9e3e7..bd8314150f70b 100644 --- a/posthog/warehouse/external_data_source/jobs.py +++ b/posthog/warehouse/external_data_source/jobs.py @@ -1,6 +1,7 @@ from uuid import UUID from posthog.warehouse.models.external_data_job import ExternalDataJob +from posthog.warehouse.models.external_data_schema import ExternalDataSchema from posthog.warehouse.models.external_data_source import ExternalDataSource @@ -10,12 +11,14 @@ def get_external_data_source(team_id: str, external_data_source_id: str) -> Exte def create_external_data_job( external_data_source_id: UUID, + external_data_schema_id: UUID, workflow_id: str, - team_id: str, + team_id: int, ) -> ExternalDataJob: job = ExternalDataJob.objects.create( team_id=team_id, pipeline_id=external_data_source_id, + schema_id=external_data_schema_id, status=ExternalDataJob.Status.RUNNING, rows_synced=0, workflow_id=workflow_id, @@ -24,15 +27,15 @@ def create_external_data_job( return job -def update_external_job_status(run_id: UUID, team_id: str, status: str, latest_error: str | None) -> ExternalDataJob: +def update_external_job_status(run_id: UUID, team_id: int, status: str, latest_error: str | None) -> ExternalDataJob: model = ExternalDataJob.objects.get(id=run_id, team_id=team_id) model.status = status model.latest_error = latest_error model.save() - pipeline = ExternalDataSource.objects.get(id=model.pipeline_id, team_id=team_id) - pipeline.status = status - pipeline.save() + schema = ExternalDataSchema.objects.get(id=model.schema_id, team_id=team_id) + schema.status = status + schema.save() model.refresh_from_db() diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index bb357d3ef7211..b095f8667d934 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -16,6 +16,9 @@ class Status(models.TextChoices): team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) pipeline: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSource", on_delete=models.CASCADE) + schema: models.ForeignKey = models.ForeignKey( + "posthog.ExternalDataSchema", on_delete=models.CASCADE, null=True, blank=True + ) status: models.CharField = models.CharField(max_length=400) rows_synced: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) latest_error: models.TextField = models.TextField( diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 8b1189131ae6d..add9350230593 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -10,6 +10,13 @@ class ExternalDataSchema(CreatedMetaFields, UUIDModel): + class Status(models.TextChoices): + RUNNING = "Running", "Running" + PAUSED = "Paused", "Paused" + ERROR = "Error", "Error" + COMPLETED = "Completed", "Completed" + CANCELLED = "Cancelled", "Cancelled" + name: models.CharField = models.CharField(max_length=400) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) source: models.ForeignKey = models.ForeignKey( @@ -22,6 +29,7 @@ class ExternalDataSchema(CreatedMetaFields, UUIDModel): latest_error: models.TextField = models.TextField( null=True, help_text="The latest error that occurred when syncing this schema." ) + status: models.CharField = models.CharField(max_length=400, null=True, blank=True) last_synced_at: models.DateTimeField = models.DateTimeField(null=True, blank=True) __repr__ = sane_repr("name") @@ -47,19 +55,20 @@ def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None return ExternalDataSchema.objects.get(id=schema_id, team_id=team_id) +@database_sync_to_async def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).values().all() - return [(val["id"], val["name"]) for val in schemas] + return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all()) def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).values().all() - return [val["name"] for val in schemas] + return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).all()) def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int): old_schemas = get_all_schemas_for_source_id(source_id=source_id, team_id=team_id) - schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas] + old_schemas_names = [schema.name for schema in old_schemas] + + schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas_names] for schema in schemas_to_create: ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=False) diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 0a044c0b06315..8d05766845c57 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -25,6 +25,8 @@ class Status(models.TextChoices): connection_id: models.CharField = models.CharField(max_length=400) destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) + + # `status` is deprecated in favour of external_data_schema.status status: models.CharField = models.CharField(max_length=400) source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices) job_inputs: encrypted_fields.fields.EncryptedJSONField = encrypted_fields.fields.EncryptedJSONField(