From 7ed7df3af3ac6de1d13e907c415a8f2a76bc4d66 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 11 Apr 2024 11:27:42 +0100 Subject: [PATCH 01/26] WIP --- latest_migrations.manifest | 2 +- .../migrations/0401_externaldatajob_schema.py | 20 ++ posthog/temporal/common/schedule.py | 11 +- posthog/temporal/data_imports/__init__.py | 8 +- .../data_imports/external_data_job.py | 235 +++--------------- .../data_imports/pipelines/pipeline.py | 13 +- .../workflow_activities/create_job_model.py | 66 +++++ .../workflow_activities/import_data.py | 142 +++++++++++ posthog/temporal/utils.py | 10 + posthog/warehouse/data_load/service.py | 37 +-- .../warehouse/data_load/validate_schema.py | 75 +++--- .../warehouse/external_data_source/jobs.py | 6 +- posthog/warehouse/models/external_data_job.py | 1 + .../warehouse/models/external_data_schema.py | 6 +- 14 files changed, 360 insertions(+), 272 deletions(-) create mode 100644 posthog/migrations/0401_externaldatajob_schema.py create mode 100644 posthog/temporal/data_imports/workflow_activities/create_job_model.py create mode 100644 posthog/temporal/data_imports/workflow_activities/import_data.py create mode 100644 posthog/temporal/utils.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 644041805cebf..62d1e1f3da106 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0400_datawarehousetable_row_count +posthog: 0401_externaldatajob_schema sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/migrations/0401_externaldatajob_schema.py b/posthog/migrations/0401_externaldatajob_schema.py new file mode 100644 index 0000000000000..8b089ca97c22a --- /dev/null +++ b/posthog/migrations/0401_externaldatajob_schema.py @@ -0,0 +1,20 @@ +# Generated by Django 4.1.13 on 2024-04-10 15:12 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0400_datawarehousetable_row_count"), + ] + + operations = [ + migrations.AddField( + model_name="externaldatajob", + name="schema", + field=models.ForeignKey( + null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema" + ), + ), + ] diff --git a/posthog/temporal/common/schedule.py b/posthog/temporal/common/schedule.py index 7e5e4dbdb393a..f31e1ac1341c8 100644 --- a/posthog/temporal/common/schedule.py +++ b/posthog/temporal/common/schedule.py @@ -55,6 +55,15 @@ async def pause_schedule(temporal: Client, schedule_id: str, note: str | None = @async_to_sync async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: - """Pause a Temporal Schedule.""" + """Trigger a Temporal Schedule.""" handle = temporal.get_schedule_handle(schedule_id) await handle.trigger() + + +def schedule_exists(temporal: Client, schedule_id: str) -> bool: + """Check whether a schedule exists.""" + try: + temporal.get_schedule_handle(schedule_id) + return True + except: + return False diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index e4d5887f22d15..481af1e6d7b60 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -1,8 +1,8 @@ from posthog.temporal.data_imports.external_data_job import ( ExternalDataJobWorkflow, - create_external_data_job_model, + create_external_data_job_model_activity, create_source_templates, - run_external_data_job, + import_data_activity, update_external_data_job_model, validate_schema_activity, ) @@ -10,9 +10,9 @@ WORKFLOWS = [ExternalDataJobWorkflow] ACTIVITIES = [ - create_external_data_job_model, + create_external_data_job_model_activity, update_external_data_job_model, - run_external_data_job, + import_data_activity, validate_schema_activity, create_source_templates, ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index b88dc44759ece..bb8cfb5373390 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -10,79 +10,31 @@ # TODO: remove dependency from posthog.temporal.batch_exports.base import PostHogWorkflow -from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count -from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING -from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken +from posthog.temporal.utils import ExternalDataWorkflowInputs +from posthog.temporal.data_imports.workflow_activities.create_job_model import ( + CreateExternalDataJobModelActivityInputs, + create_external_data_job_model_activity, +) +from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity +from posthog.warehouse.data_load.service import ( + delete_external_data_schedule, + external_data_workflow_exists, + sync_external_data_job_workflow, + trigger_external_data_workflow, +) from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table -from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs from posthog.warehouse.external_data_source.jobs import ( - create_external_data_job, update_external_job_status, ) from posthog.warehouse.models import ( ExternalDataJob, get_active_schemas_for_source_id, - sync_old_schemas_with_new_schemas, ExternalDataSource, - get_external_data_job, ) -from posthog.warehouse.models.external_data_schema import get_postgres_schemas from posthog.temporal.common.logger import bind_temporal_worker_logger -from typing import Dict, Tuple -import asyncio -from django.conf import settings - - -@dataclasses.dataclass -class CreateExternalDataJobInputs: - team_id: int - external_data_source_id: uuid.UUID - - -@activity.defn -async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) -> Tuple[str, list[Tuple[str, str]]]: - run = await sync_to_async(create_external_data_job)( - team_id=inputs.team_id, - external_data_source_id=inputs.external_data_source_id, - workflow_id=activity.info().workflow_id, - ) - - source = await sync_to_async(ExternalDataSource.objects.get)( - team_id=inputs.team_id, id=inputs.external_data_source_id - ) - source.status = "Running" - await sync_to_async(source.save)() - - if source.source_type == ExternalDataSource.Type.POSTGRES: - host = source.job_inputs.get("host") - port = source.job_inputs.get("port") - user = source.job_inputs.get("user") - password = source.job_inputs.get("password") - database = source.job_inputs.get("database") - schema = source.job_inputs.get("schema") - schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema) - else: - schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ())) - - await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore - schemas_to_sync, - source_id=inputs.external_data_source_id, - team_id=inputs.team_id, - ) - - schemas = await sync_to_async(get_active_schemas_for_source_id)( - team_id=inputs.team_id, source_id=inputs.external_data_source_id - ) - - logger = await bind_temporal_worker_logger(team_id=inputs.team_id) - - logger.info( - f"Created external data job with for external data source {inputs.external_data_source_id}", - ) - - return str(run.id), schemas +from typing import Dict @dataclasses.dataclass @@ -113,7 +65,7 @@ async def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInpu class ValidateSchemaInputs: run_id: str team_id: int - schemas: list[Tuple[str, str]] + schema_id: uuid.UUID table_schema: TSchemaTables table_row_counts: Dict[str, int] @@ -123,7 +75,7 @@ async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None: await validate_schema_and_update_table( run_id=inputs.run_id, team_id=inputs.team_id, - schemas=inputs.schemas, + schema_id=inputs.schema_id, table_schema=inputs.table_schema, table_row_counts=inputs.table_row_counts, ) @@ -145,130 +97,6 @@ async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None: await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id) -@dataclasses.dataclass -class ExternalDataWorkflowInputs: - team_id: int - external_data_source_id: uuid.UUID - - -@dataclasses.dataclass -class ExternalDataJobInputs: - team_id: int - source_id: uuid.UUID - run_id: str - schemas: list[Tuple[str, str]] - - -@activity.defn -async def run_external_data_job(inputs: ExternalDataJobInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821 - model: ExternalDataJob = await get_external_data_job( - job_id=inputs.run_id, - ) - - logger = await bind_temporal_worker_logger(team_id=inputs.team_id) - - job_inputs = PipelineInputs( - source_id=inputs.source_id, - schemas=inputs.schemas, - run_id=inputs.run_id, - team_id=inputs.team_id, - job_type=model.pipeline.source_type, - dataset_name=model.folder_path, - ) - - endpoints = [schema[1] for schema in inputs.schemas] - - source = None - if model.pipeline.source_type == ExternalDataSource.Type.STRIPE: - from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source - - stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None) - account_id = model.pipeline.job_inputs.get("stripe_account_id", None) - # Cludge: account_id should be checked here too but can deal with nulls - # until we require re update of account_ids in stripe so they're all store - if not stripe_secret_key: - raise ValueError(f"Stripe secret key not found for job {model.id}") - source = stripe_source( - api_key=stripe_secret_key, - account_id=account_id, - endpoints=tuple(endpoints), - team_id=inputs.team_id, - job_id=inputs.run_id, - ) - elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: - from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token - from posthog.temporal.data_imports.pipelines.hubspot import hubspot - - hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) - refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) - if not refresh_token: - raise ValueError(f"Hubspot refresh token not found for job {model.id}") - - if not hubspot_access_code: - hubspot_access_code = refresh_access_token(refresh_token) - - source = hubspot( - api_key=hubspot_access_code, - refresh_token=refresh_token, - endpoints=tuple(endpoints), - ) - elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES: - from posthog.temporal.data_imports.pipelines.postgres import postgres_source - - host = model.pipeline.job_inputs.get("host") - port = model.pipeline.job_inputs.get("port") - user = model.pipeline.job_inputs.get("user") - password = model.pipeline.job_inputs.get("password") - database = model.pipeline.job_inputs.get("database") - schema = model.pipeline.job_inputs.get("schema") - - source = postgres_source( - host=host, - port=port, - user=user, - password=password, - database=database, - sslmode="prefer" if settings.TEST or settings.DEBUG else "require", - schema=schema, - table_names=endpoints, - ) - elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: - from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - - credentials = ZendeskCredentialsToken() - credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") - credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") - credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") - - data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) - # Uncomment to support zendesk chat and talk - # data_chat = zendesk_chat() - # data_talk = zendesk_talk() - - source = data_support - else: - raise ValueError(f"Source type {model.pipeline.source_type} not supported") - - # Temp background heartbeat for now - async def heartbeat() -> None: - while True: - await asyncio.sleep(10) - activity.heartbeat() - - heartbeat_task = asyncio.create_task(heartbeat()) - - try: - table_row_counts = await DataImportPipeline(job_inputs, source, logger).run() - total_rows_synced = sum(table_row_counts.values()) - - await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced) - finally: - heartbeat_task.cancel() - await asyncio.wait([heartbeat_task]) - - return source.schema.tables, table_row_counts - - # TODO: update retry policies @workflow.defn(name="external-data-job") class ExternalDataJobWorkflow(PostHogWorkflow): @@ -279,16 +107,29 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs: @workflow.run async def run(self, inputs: ExternalDataWorkflowInputs): + # Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule + if inputs.external_data_schema_id is None: + schemas = await sync_to_async(get_active_schemas_for_source_id)( + team_id=inputs.team_id, source_id=inputs.external_data_source_id + ) + for schema in schemas: + if external_data_workflow_exists(schema.id): + trigger_external_data_workflow(schema) + else: + sync_external_data_job_workflow(schema, create=True) + # Delete the source schedule in favour of the schema schedules + delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id)) + return + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) # create external data job and trigger activity - create_external_data_job_inputs = CreateExternalDataJobInputs( - team_id=inputs.team_id, - external_data_source_id=inputs.external_data_source_id, + create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs( + team_id=inputs.team_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id ) - run_id, schemas = await workflow.execute_activity( - create_external_data_job_model, + run_id = await workflow.execute_activity( + create_external_data_job_model_activity, create_external_data_job_inputs, start_to_close_timeout=dt.timedelta(minutes=1), retry_policy=RetryPolicy( @@ -304,15 +145,15 @@ async def run(self, inputs: ExternalDataWorkflowInputs): ) try: - job_inputs = ExternalDataJobInputs( - source_id=inputs.external_data_source_id, + job_inputs = ImportDataActivityInputs( team_id=inputs.team_id, run_id=run_id, - schemas=schemas, + schema_id=inputs.external_data_schema_id, + source_id=inputs.external_data_source_id, ) table_schemas, table_row_counts = await workflow.execute_activity( - run_external_data_job, + import_data_activity, job_inputs, start_to_close_timeout=dt.timedelta(hours=30), retry_policy=RetryPolicy(maximum_attempts=5), @@ -323,7 +164,7 @@ async def run(self, inputs: ExternalDataWorkflowInputs): validate_inputs = ValidateSchemaInputs( run_id=run_id, team_id=inputs.team_id, - schemas=schemas, + schema_id=inputs.external_data_schema_id, table_schema=table_schemas, table_row_counts=table_row_counts, ) diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 920f3eba88d23..d91ce311808a5 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -17,7 +17,7 @@ class PipelineInputs: source_id: UUID run_id: str - schemas: list[tuple[str, str]] + schema_id: UUID dataset_name: str job_type: str team_id: int @@ -68,13 +68,6 @@ def _create_pipeline(self): dataset_name=self.inputs.dataset_name, ) - def _get_schemas(self): - if not self.inputs.schemas: - self.logger.info(f"No schemas found for source id {self.inputs.source_id}") - return None - - return self.inputs.schemas - def _run(self) -> Dict[str, int]: pipeline = self._create_pipeline() pipeline.run(self.source, loader_file_format=self.loader_file_format) @@ -86,10 +79,6 @@ def _run(self) -> Dict[str, int]: return dict(filtered_rows) async def run(self) -> Dict[str, int]: - schemas = self._get_schemas() - if not schemas: - return {} - try: return await asyncio.to_thread(self._run) except PipelineStepFailed: diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py new file mode 100644 index 0000000000000..90e1453d5e86e --- /dev/null +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -0,0 +1,66 @@ +import dataclasses +import uuid + +from asgiref.sync import sync_to_async +from temporalio import activity + +# TODO: remove dependency +from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING + +from posthog.warehouse.external_data_source.jobs import ( + create_external_data_job, +) +from posthog.warehouse.models import ( + sync_old_schemas_with_new_schemas, + ExternalDataSource, +) +from posthog.warehouse.models.external_data_schema import get_postgres_schemas +from posthog.temporal.common.logger import bind_temporal_worker_logger + + +@dataclasses.dataclass +class CreateExternalDataJobModelActivityInputs: + team_id: int + schema_id: uuid.UUID + source_id: uuid.UUID + + +@activity.defn +async def create_external_data_job_model_activity(inputs: CreateExternalDataJobModelActivityInputs) -> str: + run = await sync_to_async(create_external_data_job)( + team_id=inputs.team_id, + external_data_source_id=inputs.source_id, + external_data_schema_id=inputs.schema_id, + workflow_id=activity.info().workflow_id, + ) + + source = await sync_to_async(ExternalDataSource.objects.get)(team_id=inputs.team_id, id=inputs.source_id) + source.status = "Running" + await sync_to_async(source.save)() + + if source.source_type == ExternalDataSource.Type.POSTGRES: + host = source.job_inputs.get("host") + port = source.job_inputs.get("port") + user = source.job_inputs.get("user") + password = source.job_inputs.get("password") + database = source.job_inputs.get("database") + schema = source.job_inputs.get("schema") + schemas_to_sync = await sync_to_async(get_postgres_schemas)(host, port, database, user, password, schema) + else: + schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ())) + + # TODO: this could cause a race condition where each schema worker creates the missing schema + + await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore + schemas_to_sync, + source_id=inputs.source_id, + team_id=inputs.team_id, + ) + + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + logger.info( + f"Created external data job with for external data source {inputs.source_id}", + ) + + return str(run.id) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py new file mode 100644 index 0000000000000..b646727457c95 --- /dev/null +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -0,0 +1,142 @@ +import dataclasses +import uuid + +from dlt.common.schema.typing import TSchemaTables +from temporalio import activity + +# TODO: remove dependency +from posthog.temporal.data_imports.pipelines.helpers import aupdate_job_count +from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken + +from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs +from posthog.warehouse.models import ( + ExternalDataJob, + ExternalDataSource, + get_external_data_job, +) +from posthog.temporal.common.logger import bind_temporal_worker_logger +from typing import Dict, Tuple +import asyncio +from django.conf import settings + +from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id + + +@dataclasses.dataclass +class ImportDataActivityInputs: + team_id: int + schema_id: uuid.UUID + source_id: uuid.UUID + run_id: str + + +@activity.defn +async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchemaTables, Dict[str, int]]: # noqa: F821 + model: ExternalDataJob = await get_external_data_job( + job_id=inputs.run_id, + ) + + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + job_inputs = PipelineInputs( + source_id=inputs.source_id, + schema_id=inputs.schema_id, + run_id=inputs.run_id, + team_id=inputs.team_id, + job_type=model.pipeline.source_type, + dataset_name=model.folder_path, + ) + + schema: ExternalDataSchema = await aget_schema_by_id(inputs.schema_id, inputs.team_id) + + endpoints = schema.name + + source = None + if model.pipeline.source_type == ExternalDataSource.Type.STRIPE: + from posthog.temporal.data_imports.pipelines.stripe.helpers import stripe_source + + stripe_secret_key = model.pipeline.job_inputs.get("stripe_secret_key", None) + account_id = model.pipeline.job_inputs.get("stripe_account_id", None) + # Cludge: account_id should be checked here too but can deal with nulls + # until we require re update of account_ids in stripe so they're all store + if not stripe_secret_key: + raise ValueError(f"Stripe secret key not found for job {model.id}") + source = stripe_source( + api_key=stripe_secret_key, + account_id=account_id, + endpoints=tuple(endpoints), + team_id=inputs.team_id, + job_id=inputs.run_id, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: + from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token + from posthog.temporal.data_imports.pipelines.hubspot import hubspot + + hubspot_access_code = model.pipeline.job_inputs.get("hubspot_secret_key", None) + refresh_token = model.pipeline.job_inputs.get("hubspot_refresh_token", None) + if not refresh_token: + raise ValueError(f"Hubspot refresh token not found for job {model.id}") + + if not hubspot_access_code: + hubspot_access_code = refresh_access_token(refresh_token) + + source = hubspot( + api_key=hubspot_access_code, + refresh_token=refresh_token, + endpoints=tuple(endpoints), + ) + elif model.pipeline.source_type == ExternalDataSource.Type.POSTGRES: + from posthog.temporal.data_imports.pipelines.postgres import postgres_source + + host = model.pipeline.job_inputs.get("host") + port = model.pipeline.job_inputs.get("port") + user = model.pipeline.job_inputs.get("user") + password = model.pipeline.job_inputs.get("password") + database = model.pipeline.job_inputs.get("database") + pg_schema = model.pipeline.job_inputs.get("schema") + + source = postgres_source( + host=host, + port=port, + user=user, + password=password, + database=database, + sslmode="prefer" if settings.TEST or settings.DEBUG else "require", + schema=pg_schema, + table_names=endpoints, + ) + elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: + from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support + + credentials = ZendeskCredentialsToken() + credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") + credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") + credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") + + data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) + # Uncomment to support zendesk chat and talk + # data_chat = zendesk_chat() + # data_talk = zendesk_talk() + + source = data_support + else: + raise ValueError(f"Source type {model.pipeline.source_type} not supported") + + # Temp background heartbeat for now + async def heartbeat() -> None: + while True: + await asyncio.sleep(10) + activity.heartbeat() + + heartbeat_task = asyncio.create_task(heartbeat()) + + try: + table_row_counts = await DataImportPipeline(job_inputs, source, logger).run() + total_rows_synced = sum(table_row_counts.values()) + + await aupdate_job_count(inputs.run_id, inputs.team_id, total_rows_synced) + finally: + heartbeat_task.cancel() + await asyncio.wait([heartbeat_task]) + + return source.schema.tables, table_row_counts diff --git a/posthog/temporal/utils.py b/posthog/temporal/utils.py new file mode 100644 index 0000000000000..39722c6a76b92 --- /dev/null +++ b/posthog/temporal/utils.py @@ -0,0 +1,10 @@ +import dataclasses +import uuid + + +# Dataclass living here to avoid circular reference +@dataclasses.dataclass +class ExternalDataWorkflowInputs: + team_id: int + external_data_source_id: uuid.UUID + external_data_schema_id: uuid.UUID | None diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index 957197147d476..78f99cd9401b8 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -16,14 +16,13 @@ from posthog.temporal.common.schedule import ( create_schedule, pause_schedule, + schedule_exists, trigger_schedule, update_schedule, delete_schedule, unpause_schedule, ) -from posthog.temporal.data_imports.external_data_job import ( - ExternalDataWorkflowInputs, -) +from posthog.temporal.utils import ExternalDataWorkflowInputs from posthog.warehouse.models import ExternalDataSource import temporalio from temporalio.client import Client as TemporalClient @@ -32,46 +31,54 @@ from django.conf import settings import s3fs +from posthog.warehouse.models.external_data_schema import ExternalDataSchema + def sync_external_data_job_workflow( - external_data_source: ExternalDataSource, create: bool = False -) -> ExternalDataSource: + external_data_schema: ExternalDataSchema, create: bool = False +) -> ExternalDataSchema: temporal = sync_connect() inputs = ExternalDataWorkflowInputs( - team_id=external_data_source.team.id, - external_data_source_id=external_data_source.pk, + team_id=external_data_schema.team.id, + external_data_schema_id=external_data_schema.id, + external_data_source_id=external_data_schema.source_id, ) schedule = Schedule( action=ScheduleActionStartWorkflow( "external-data-job", asdict(inputs), - id=str(external_data_source.pk), + id=str(external_data_schema.id), task_queue=str(DATA_WAREHOUSE_TASK_QUEUE), ), spec=ScheduleSpec( intervals=[ ScheduleIntervalSpec( - every=timedelta(hours=24), offset=timedelta(hours=external_data_source.created_at.hour) + every=timedelta(hours=24), offset=timedelta(hours=external_data_schema.created_at.hour) ) ], jitter=timedelta(hours=2), ), - state=ScheduleState(note=f"Schedule for external data source: {external_data_source.pk}"), + state=ScheduleState(note=f"Schedule for external data source: {external_data_schema.pk}"), policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.SKIP), ) if create: - create_schedule(temporal, id=str(external_data_source.id), schedule=schedule, trigger_immediately=True) + create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True) else: - update_schedule(temporal, id=str(external_data_source.id), schedule=schedule) + update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule) + + return external_data_schema - return external_data_source + +def trigger_external_data_workflow(external_data_schema: ExternalDataSchema): + temporal = sync_connect() + trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) -def trigger_external_data_workflow(external_data_source: ExternalDataSource): +def external_data_workflow_exists(id: str) -> bool: temporal = sync_connect() - trigger_schedule(temporal, schedule_id=str(external_data_source.id)) + return schedule_exists(temporal, schedule_id=id) def pause_external_data_schedule(external_data_source: ExternalDataSource): diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 44190a42d3d2e..052bb2721b7d9 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -1,3 +1,4 @@ +import uuid from django.conf import settings from dlt.common.schema.typing import TSchemaTables from dlt.common.data_types.typing import TDataType @@ -26,8 +27,9 @@ from posthog.temporal.common.logger import bind_temporal_worker_logger from clickhouse_driver.errors import ServerException from asgiref.sync import sync_to_async -from typing import Dict, Tuple, Type +from typing import Dict, Type from posthog.utils import camel_to_snake_case +from posthog.warehouse.models.external_data_schema import ExternalDataSchema def dlt_to_hogql_type(dlt_type: TDataType | None) -> str: @@ -91,7 +93,7 @@ async def validate_schema( async def validate_schema_and_update_table( run_id: str, team_id: int, - schemas: list[Tuple[str, str]], + schema_id: uuid.UUID, table_schema: TSchemaTables, table_row_counts: Dict[str, int], ) -> None: @@ -103,51 +105,40 @@ async def validate_schema_and_update_table( Arguments: run_id: The id of the external data job team_id: The id of the team - schemas: The list of schemas that have been synced by the external data job + schema_id: The schema for which the data job relates to + table_schema: The DLT schema from the data load stage + table_row_counts: The count of synced rows from DLT """ logger = await bind_temporal_worker_logger(team_id=team_id) job: ExternalDataJob = await get_external_data_job(job_id=run_id) - last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(job.team_id, job.pipeline_id) + last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(team_id, job.pipeline_id) credential: DataWarehouseCredential = await get_or_create_datawarehouse_credential( - team_id=job.team_id, + team_id=team_id, access_key=settings.AIRBYTE_BUCKET_KEY, access_secret=settings.AIRBYTE_BUCKET_SECRET, ) - for _schema in schemas: - _schema_id = _schema[0] - _schema_name = _schema[1] + external_data_schema: ExternalDataSchema = await aget_schema_by_id(schema_id, team_id) - table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() - new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name)) - row_count = table_row_counts.get(_schema_name, 0) + _schema_id = external_data_schema.id + _schema_name = external_data_schema.name - # Check - try: - data = await validate_schema( - credential=credential, - table_name=table_name, - new_url_pattern=new_url_pattern, - team_id=team_id, - row_count=row_count, - ) - except ServerException as err: - if err.code == 636: - logger.exception( - f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}", - exc_info=err, - ) - continue - except Exception as e: - # TODO: handle other exceptions here - logger.exception( - f"Data Warehouse: Could not validate schema for external data job {job.pk}", - exc_info=e, - ) - continue + table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() + new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name)) + row_count = table_row_counts.get(_schema_name, 0) + + # Check + try: + data = await validate_schema( + credential=credential, + table_name=table_name, + new_url_pattern=new_url_pattern, + team_id=team_id, + row_count=row_count, + ) # create or update table_created = None @@ -190,13 +181,26 @@ async def validate_schema_and_update_table( await asave_datawarehousetable(table_created) # schema could have been deleted by this point - schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=job.team_id) + schema_model = await aget_schema_by_id(schema_id=_schema_id, team_id=team_id) if schema_model: schema_model.table = table_created schema_model.last_synced_at = job.created_at await asave_external_data_schema(schema_model) + except ServerException as err: + if err.code == 636: + logger.exception( + f"Data Warehouse: No data for schema {_schema_name} for external data job {job.pk}", + exc_info=err, + ) + except Exception as e: + # TODO: handle other exceptions here + logger.exception( + f"Data Warehouse: Could not validate schema for external data job {job.pk}", + exc_info=e, + ) + if last_successful_job: try: last_successful_job.delete_data_in_bucket() @@ -205,4 +209,3 @@ async def validate_schema_and_update_table( f"Data Warehouse: Could not delete deprecated data source {last_successful_job.pk}", exc_info=e, ) - pass diff --git a/posthog/warehouse/external_data_source/jobs.py b/posthog/warehouse/external_data_source/jobs.py index 7370615e9e3e7..b56550b1b206d 100644 --- a/posthog/warehouse/external_data_source/jobs.py +++ b/posthog/warehouse/external_data_source/jobs.py @@ -10,12 +10,14 @@ def get_external_data_source(team_id: str, external_data_source_id: str) -> Exte def create_external_data_job( external_data_source_id: UUID, + external_data_schema_id: UUID, workflow_id: str, - team_id: str, + team_id: int, ) -> ExternalDataJob: job = ExternalDataJob.objects.create( team_id=team_id, pipeline_id=external_data_source_id, + schema_id=external_data_schema_id, status=ExternalDataJob.Status.RUNNING, rows_synced=0, workflow_id=workflow_id, @@ -24,7 +26,7 @@ def create_external_data_job( return job -def update_external_job_status(run_id: UUID, team_id: str, status: str, latest_error: str | None) -> ExternalDataJob: +def update_external_job_status(run_id: UUID, team_id: int, status: str, latest_error: str | None) -> ExternalDataJob: model = ExternalDataJob.objects.get(id=run_id, team_id=team_id) model.status = status model.latest_error = latest_error diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index bb357d3ef7211..c7ca3b83933af 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -16,6 +16,7 @@ class Status(models.TextChoices): team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) pipeline: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSource", on_delete=models.CASCADE) + schema: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSchema", on_delete=models.CASCADE, null=True) status: models.CharField = models.CharField(max_length=400) rows_synced: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) latest_error: models.TextField = models.TextField( diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 8b1189131ae6d..d8a4e532680ed 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -48,13 +48,11 @@ def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).values().all() - return [(val["id"], val["name"]) for val in schemas] + return ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all() def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - schemas = ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).values().all() - return [val["name"] for val in schemas] + return ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).all() def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int): From 06bd71aac5fba1e1fd11a55efdfae0cd9d5ff8da Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 11 Apr 2024 17:59:16 +0100 Subject: [PATCH 02/26] Reworked the worker to self-manage making schema schedules and use async temporal calls --- posthog/temporal/common/client.py | 13 ++++ posthog/temporal/common/schedule.py | 37 ++++++++++- posthog/temporal/data_imports/__init__.py | 2 + .../data_imports/external_data_job.py | 63 ++++++++++++++----- .../workflow_activities/import_data.py | 2 +- posthog/temporal/utils.py | 2 +- posthog/warehouse/api/external_data_source.py | 7 +++ posthog/warehouse/data_load/service.py | 62 ++++++++++++++---- .../warehouse/models/external_data_schema.py | 7 ++- 9 files changed, 161 insertions(+), 34 deletions(-) diff --git a/posthog/temporal/common/client.py b/posthog/temporal/common/client.py index 5794115c5cd18..87c9eb939f599 100644 --- a/posthog/temporal/common/client.py +++ b/posthog/temporal/common/client.py @@ -49,3 +49,16 @@ async def sync_connect() -> Client: settings.TEMPORAL_CLIENT_KEY, ) return client + + +async def async_connect() -> Client: + """Asynchronous connect to Temporal and return a Client.""" + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + return client diff --git a/posthog/temporal/common/schedule.py b/posthog/temporal/common/schedule.py index f31e1ac1341c8..7e2ae334a73bc 100644 --- a/posthog/temporal/common/schedule.py +++ b/posthog/temporal/common/schedule.py @@ -12,6 +12,15 @@ async def create_schedule(temporal: Client, id: str, schedule: Schedule, trigger ) +async def a_create_schedule(temporal: Client, id: str, schedule: Schedule, trigger_immediately: bool = False): + """Async create a Temporal Schedule.""" + return await temporal.create_schedule( + id=id, + schedule=schedule, + trigger_immediately=trigger_immediately, + ) + + @async_to_sync async def update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: """Update a Temporal Schedule.""" @@ -25,6 +34,18 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: ) +async def a_update_schedule(temporal: Client, id: str, schedule: Schedule) -> None: + """Async update a Temporal Schedule.""" + handle = temporal.get_schedule_handle(id) + + async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: + return ScheduleUpdate(schedule=schedule) + + return await handle.update( + updater=updater, + ) + + @async_to_sync async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: """Unpause a Temporal Schedule.""" @@ -39,6 +60,12 @@ async def delete_schedule(temporal: Client, schedule_id: str) -> None: await handle.delete() +async def a_delete_schedule(temporal: Client, schedule_id: str) -> None: + """Async delete a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.delete() + + @async_to_sync async def describe_schedule(temporal: Client, schedule_id: str): """Describe a Temporal Schedule.""" @@ -60,10 +87,16 @@ async def trigger_schedule(temporal: Client, schedule_id: str, note: str | None await handle.trigger() -def schedule_exists(temporal: Client, schedule_id: str) -> bool: +async def a_trigger_schedule(temporal: Client, schedule_id: str, note: str | None = None) -> None: + """Trigger a Temporal Schedule.""" + handle = temporal.get_schedule_handle(schedule_id) + await handle.trigger() + + +async def a_schedule_exists(temporal: Client, schedule_id: str) -> bool: """Check whether a schedule exists.""" try: - temporal.get_schedule_handle(schedule_id) + await temporal.get_schedule_handle(schedule_id).describe() return True except: return False diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index 481af1e6d7b60..3259e91f002cf 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -5,6 +5,7 @@ import_data_activity, update_external_data_job_model, validate_schema_activity, + check_schedule_activity, ) WORKFLOWS = [ExternalDataJobWorkflow] @@ -15,4 +16,5 @@ import_data_activity, validate_schema_activity, create_source_templates, + check_schedule_activity, ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index bb8cfb5373390..62a47092d81bd 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -17,10 +17,10 @@ ) from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity from posthog.warehouse.data_load.service import ( - delete_external_data_schedule, - external_data_workflow_exists, - sync_external_data_job_workflow, - trigger_external_data_workflow, + a_delete_external_data_schedule, + a_external_data_workflow_exists, + a_sync_external_data_job_workflow, + a_trigger_external_data_workflow, ) from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source @@ -97,6 +97,32 @@ async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None: await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id) +@activity.defn +async def check_schedule_activity(inputs: ExternalDataWorkflowInputs) -> bool: + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + # Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule + if inputs.external_data_schema_id is None: + logger.info("Schema ID is none, creating schedules for schemas...") + schemas = await get_active_schemas_for_source_id( + team_id=inputs.team_id, source_id=inputs.external_data_source_id + ) + for schema in schemas: + if await a_external_data_workflow_exists(schema.id): + await a_trigger_external_data_workflow(schema) + logger.info(f"Schedule exists for schema {schema.id}. Triggered schedule") + else: + await a_sync_external_data_job_workflow(schema, create=True) + logger.info(f"Created schedule for schema {schema.id}") + # Delete the source schedule in favour of the schema schedules + await a_delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id)) + logger.info(f"Deleted schedule for source {inputs.external_data_source_id}") + return True + + logger.info("Schema ID is set. Continuing...") + return False + + # TODO: update retry policies @workflow.defn(name="external-data-job") class ExternalDataJobWorkflow(PostHogWorkflow): @@ -107,21 +133,24 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs: @workflow.run async def run(self, inputs: ExternalDataWorkflowInputs): - # Creates schedules for all schemas if they don't exist yet, and then remove itself as a source schedule - if inputs.external_data_schema_id is None: - schemas = await sync_to_async(get_active_schemas_for_source_id)( - team_id=inputs.team_id, source_id=inputs.external_data_source_id - ) - for schema in schemas: - if external_data_workflow_exists(schema.id): - trigger_external_data_workflow(schema) - else: - sync_external_data_job_workflow(schema, create=True) - # Delete the source schedule in favour of the schema schedules - delete_external_data_schedule(ExternalDataSource(id=inputs.external_data_source_id)) + logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + + should_exit = await workflow.execute_activity( + check_schedule_activity, + inputs, + start_to_close_timeout=dt.timedelta(minutes=1), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) + + if should_exit: return - logger = await bind_temporal_worker_logger(team_id=inputs.team_id) + assert inputs.external_data_schema_id is not None # create external data job and trigger activity create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs( diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index b646727457c95..98ac8d60fa468 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -49,7 +49,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem schema: ExternalDataSchema = await aget_schema_by_id(inputs.schema_id, inputs.team_id) - endpoints = schema.name + endpoints = [schema.name] source = None if model.pipeline.source_type == ExternalDataSource.Type.STRIPE: diff --git a/posthog/temporal/utils.py b/posthog/temporal/utils.py index 39722c6a76b92..fa96af6442489 100644 --- a/posthog/temporal/utils.py +++ b/posthog/temporal/utils.py @@ -7,4 +7,4 @@ class ExternalDataWorkflowInputs: team_id: int external_data_source_id: uuid.UUID - external_data_schema_id: uuid.UUID | None + external_data_schema_id: uuid.UUID | None = None diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index d1c766eb2b2eb..c5084bec15b7f 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -348,6 +348,13 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): trigger_external_data_workflow(instance) except temporalio.service.RPCError as e: + # if the source schedule has been removed - trigger the schema schedules + if e.message == "workflow execution already completed": + for schema in ExternalDataSchema.objects.filter( + team_id=self.team_id, source_id=instance.id, should_sync=True + ).all(): + trigger_external_data_workflow(schema) + # schedule doesn't exist if e.message == "sql: no rows in result set": sync_external_data_job_workflow(instance, create=True) diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index 78f99cd9401b8..b5969303622bf 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -12,11 +12,15 @@ ) from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE -from posthog.temporal.common.client import sync_connect +from posthog.temporal.common.client import async_connect, sync_connect from posthog.temporal.common.schedule import ( + a_create_schedule, + a_delete_schedule, + a_trigger_schedule, + a_update_schedule, create_schedule, pause_schedule, - schedule_exists, + a_schedule_exists, trigger_schedule, update_schedule, delete_schedule, @@ -34,17 +38,14 @@ from posthog.warehouse.models.external_data_schema import ExternalDataSchema -def sync_external_data_job_workflow( - external_data_schema: ExternalDataSchema, create: bool = False -) -> ExternalDataSchema: - temporal = sync_connect() +def get_sync_schedule(external_data_schema: ExternalDataSchema): inputs = ExternalDataWorkflowInputs( - team_id=external_data_schema.team.id, + team_id=external_data_schema.team_id, external_data_schema_id=external_data_schema.id, external_data_source_id=external_data_schema.source_id, ) - schedule = Schedule( + return Schedule( action=ScheduleActionStartWorkflow( "external-data-job", asdict(inputs), @@ -63,6 +64,14 @@ def sync_external_data_job_workflow( policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.SKIP), ) + +def sync_external_data_job_workflow( + external_data_schema: ExternalDataSchema, create: bool = False +) -> ExternalDataSchema: + temporal = sync_connect() + + schedule = get_sync_schedule(external_data_schema) + if create: create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True) else: @@ -71,14 +80,34 @@ def sync_external_data_job_workflow( return external_data_schema +async def a_sync_external_data_job_workflow( + external_data_schema: ExternalDataSchema, create: bool = False +) -> ExternalDataSchema: + temporal = await async_connect() + + schedule = get_sync_schedule(external_data_schema) + + if create: + await a_create_schedule(temporal, id=str(external_data_schema.id), schedule=schedule, trigger_immediately=True) + else: + await a_update_schedule(temporal, id=str(external_data_schema.id), schedule=schedule) + + return external_data_schema + + def trigger_external_data_workflow(external_data_schema: ExternalDataSchema): temporal = sync_connect() trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) -def external_data_workflow_exists(id: str) -> bool: - temporal = sync_connect() - return schedule_exists(temporal, schedule_id=id) +async def a_trigger_external_data_workflow(external_data_schema: ExternalDataSchema): + temporal = await async_connect() + await a_trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) + + +async def a_external_data_workflow_exists(id: str) -> bool: + temporal = await async_connect() + return await a_schedule_exists(temporal, schedule_id=id) def pause_external_data_schedule(external_data_source: ExternalDataSource): @@ -102,6 +131,17 @@ def delete_external_data_schedule(external_data_source: ExternalDataSource): raise +async def a_delete_external_data_schedule(external_data_source: ExternalDataSource): + temporal = await async_connect() + try: + await a_delete_schedule(temporal, schedule_id=str(external_data_source.id)) + except temporalio.service.RPCError as e: + # Swallow error if schedule does not exist already + if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: + return + raise + + def cancel_external_data_workflow(workflow_id: str): temporal = sync_connect() cancel_workflow(temporal, workflow_id) diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index d8a4e532680ed..d6d416a94c64d 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -47,8 +47,9 @@ def aget_schema_by_id(schema_id: str, team_id: int) -> ExternalDataSchema | None return ExternalDataSchema.objects.get(id=schema_id, team_id=team_id) +@database_sync_to_async def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - return ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all() + return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id, should_sync=True).all()) def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): @@ -57,7 +58,9 @@ def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int): old_schemas = get_all_schemas_for_source_id(source_id=source_id, team_id=team_id) - schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas] + old_schemas_names = [schema.name for schema in old_schemas] + + schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas_names] for schema in schemas_to_create: ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=False) From 133ae4300c01a4a5387d3c613603d317c4481347 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 13:17:43 +0100 Subject: [PATCH 03/26] Added schema status and use it for the job status --- .../migrations/0401_externaldatajob_schema.py | 5 ++++ .../workflow_activities/create_job_model.py | 8 ++++--- posthog/warehouse/api/external_data_source.py | 23 +++++++++++++++++++ .../warehouse/external_data_source/jobs.py | 7 +++--- .../warehouse/models/external_data_schema.py | 8 +++++++ .../warehouse/models/external_data_source.py | 2 ++ 6 files changed, 47 insertions(+), 6 deletions(-) diff --git a/posthog/migrations/0401_externaldatajob_schema.py b/posthog/migrations/0401_externaldatajob_schema.py index 8b089ca97c22a..09ce3b5f86874 100644 --- a/posthog/migrations/0401_externaldatajob_schema.py +++ b/posthog/migrations/0401_externaldatajob_schema.py @@ -17,4 +17,9 @@ class Migration(migrations.Migration): null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema" ), ), + migrations.AddField( + model_name="externaldataschema", + name="status", + field=models.CharField(max_length=400, null=True), + ), ] diff --git a/posthog/temporal/data_imports/workflow_activities/create_job_model.py b/posthog/temporal/data_imports/workflow_activities/create_job_model.py index 90e1453d5e86e..d03e4173e2de5 100644 --- a/posthog/temporal/data_imports/workflow_activities/create_job_model.py +++ b/posthog/temporal/data_imports/workflow_activities/create_job_model.py @@ -14,7 +14,7 @@ sync_old_schemas_with_new_schemas, ExternalDataSource, ) -from posthog.warehouse.models.external_data_schema import get_postgres_schemas +from posthog.warehouse.models.external_data_schema import ExternalDataSchema, get_postgres_schemas from posthog.temporal.common.logger import bind_temporal_worker_logger @@ -34,9 +34,11 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM workflow_id=activity.info().workflow_id, ) + schema = await sync_to_async(ExternalDataSchema.objects.get)(team_id=inputs.team_id, id=inputs.schema_id) + schema.status = ExternalDataSchema.Status.RUNNING + await sync_to_async(schema.save)() + source = await sync_to_async(ExternalDataSource.objects.get)(team_id=inputs.team_id, id=inputs.source_id) - source.status = "Running" - await sync_to_async(source.save)() if source.source_type == ExternalDataSource.Type.POSTGRES: host = source.job_inputs.get("host") diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index c5084bec15b7f..bd181390420ff 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -41,6 +41,7 @@ class ExternalDataSourceSerializers(serializers.ModelSerializer): account_id = serializers.CharField(write_only=True) client_secret = serializers.CharField(write_only=True) last_run_at = serializers.SerializerMethodField(read_only=True) + status = serializers.SerializerMethodField(read_only=True) schemas = serializers.SerializerMethodField(read_only=True) class Meta: @@ -68,6 +69,28 @@ def get_last_run_at(self, instance: ExternalDataSource) -> str: return latest_completed_run.created_at if latest_completed_run else None + def get_status(self, instance: ExternalDataSource) -> str: + active_schemas: List[ExternalDataSchema] = list(instance.schemas.filter(should_sync=True).all()) + any_failures = any(schema.status == ExternalDataSchema.Status.ERROR for schema in active_schemas) + any_cancelled = any(schema.status == ExternalDataSchema.Status.CANCELLED for schema in active_schemas) + any_paused = any(schema.status == ExternalDataSchema.Status.PAUSED for schema in active_schemas) + any_running = any(schema.status == ExternalDataSchema.Status.RUNNING for schema in active_schemas) + any_completed = any(schema.status == ExternalDataSchema.Status.COMPLETED for schema in active_schemas) + + if any_failures: + return ExternalDataSchema.Status.ERROR + elif any_cancelled: + return ExternalDataSchema.Status.CANCELLED + elif any_paused: + return ExternalDataSchema.Status.PAUSED + elif any_running: + return ExternalDataSchema.Status.RUNNING + elif any_completed: + return ExternalDataSchema.Status.COMPLETED + else: + # Fallback during migration phase of going from source -> schema as the source of truth for syncs + return instance.status + def get_schemas(self, instance: ExternalDataSource): schemas = instance.schemas.order_by("name").all() return ExternalDataSchemaSerializer(schemas, many=True, read_only=True, context=self.context).data diff --git a/posthog/warehouse/external_data_source/jobs.py b/posthog/warehouse/external_data_source/jobs.py index b56550b1b206d..bd8314150f70b 100644 --- a/posthog/warehouse/external_data_source/jobs.py +++ b/posthog/warehouse/external_data_source/jobs.py @@ -1,6 +1,7 @@ from uuid import UUID from posthog.warehouse.models.external_data_job import ExternalDataJob +from posthog.warehouse.models.external_data_schema import ExternalDataSchema from posthog.warehouse.models.external_data_source import ExternalDataSource @@ -32,9 +33,9 @@ def update_external_job_status(run_id: UUID, team_id: int, status: str, latest_e model.latest_error = latest_error model.save() - pipeline = ExternalDataSource.objects.get(id=model.pipeline_id, team_id=team_id) - pipeline.status = status - pipeline.save() + schema = ExternalDataSchema.objects.get(id=model.schema_id, team_id=team_id) + schema.status = status + schema.save() model.refresh_from_db() diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index d6d416a94c64d..bc39b96359e76 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -10,6 +10,13 @@ class ExternalDataSchema(CreatedMetaFields, UUIDModel): + class Status(models.TextChoices): + RUNNING = "Running", "Running" + PAUSED = "Paused", "Paused" + ERROR = "Error", "Error" + COMPLETED = "Completed", "Completed" + CANCELLED = "Cancelled", "Cancelled" + name: models.CharField = models.CharField(max_length=400) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) source: models.ForeignKey = models.ForeignKey( @@ -22,6 +29,7 @@ class ExternalDataSchema(CreatedMetaFields, UUIDModel): latest_error: models.TextField = models.TextField( null=True, help_text="The latest error that occurred when syncing this schema." ) + status: models.CharField = models.CharField(max_length=400, null=True) last_synced_at: models.DateTimeField = models.DateTimeField(null=True, blank=True) __repr__ = sane_repr("name") diff --git a/posthog/warehouse/models/external_data_source.py b/posthog/warehouse/models/external_data_source.py index 0a044c0b06315..8d05766845c57 100644 --- a/posthog/warehouse/models/external_data_source.py +++ b/posthog/warehouse/models/external_data_source.py @@ -25,6 +25,8 @@ class Status(models.TextChoices): connection_id: models.CharField = models.CharField(max_length=400) destination_id: models.CharField = models.CharField(max_length=400, null=True, blank=True) team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) + + # `status` is deprecated in favour of external_data_schema.status status: models.CharField = models.CharField(max_length=400) source_type: models.CharField = models.CharField(max_length=128, choices=Type.choices) job_inputs: encrypted_fields.fields.EncryptedJSONField = encrypted_fields.fields.EncryptedJSONField( From 973483f306d156323ee5d5cbd18f63520cbe0137 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 14:42:47 +0100 Subject: [PATCH 04/26] Fixed existing tests --- .../external_data/test_external_data_job.py | 262 +++++------------- .../warehouse/models/external_data_schema.py | 2 +- 2 files changed, 74 insertions(+), 190 deletions(-) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index 7603a1a98d47e..b3907d2f3fa0d 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -6,21 +6,23 @@ from django.test import override_settings from posthog.temporal.data_imports.external_data_job import ( - CreateExternalDataJobInputs, UpdateExternalDataJobStatusInputs, ValidateSchemaInputs, - create_external_data_job, - create_external_data_job_model, + check_schedule_activity, create_source_templates, - run_external_data_job, update_external_data_job_model, validate_schema_activity, ) from posthog.temporal.data_imports.external_data_job import ( ExternalDataJobWorkflow, - ExternalDataJobInputs, ExternalDataWorkflowInputs, ) +from posthog.temporal.data_imports.workflow_activities.create_job_model import ( + CreateExternalDataJobModelActivityInputs, + create_external_data_job_model_activity, +) +from posthog.temporal.data_imports.workflow_activities.import_data import ImportDataActivityInputs, import_data_activity +from posthog.warehouse.external_data_source.jobs import create_external_data_job from posthog.warehouse.models import ( get_latest_run_if_exists, DataWarehouseTable, @@ -146,13 +148,16 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs source_type="Stripe", ) - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + test_1_schema = await _create_schema("test-1", new_source, team) + + inputs = CreateExternalDataJobModelActivityInputs( + team_id=team.id, source_id=new_source.pk, schema_id=test_1_schema.id + ) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() - assert len(schemas) == 0 @pytest.mark.django_db(transaction=True) @@ -167,26 +172,18 @@ async def test_create_external_job_activity_schemas_exist(activity_environment, source_type="Stripe", ) - await sync_to_async(ExternalDataSchema.objects.create)( + schema = await sync_to_async(ExternalDataSchema.objects.create)( name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], team_id=team.id, source_id=new_source.pk, ) - await sync_to_async(ExternalDataSchema.objects.create)( - name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1], - team_id=team.id, - source_id=new_source.pk, - should_sync=False, - ) - - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() - assert len(schemas) == 1 @pytest.mark.django_db(transaction=True) @@ -201,22 +198,16 @@ async def test_create_external_job_activity_update_schemas(activity_environment, source_type="Stripe", ) - await sync_to_async(ExternalDataSchema.objects.create)( + schema = await sync_to_async(ExternalDataSchema.objects.create)( name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], team_id=team.id, source_id=new_source.pk, should_sync=True, ) - await sync_to_async(ExternalDataSchema.objects.create)( - name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][1], - team_id=team.id, - source_id=new_source.pk, - ) - - inputs = CreateExternalDataJobInputs(team_id=team.id, external_data_source_id=new_source.pk) + inputs = CreateExternalDataJobModelActivityInputs(team_id=team.id, source_id=new_source.pk, schema_id=schema.id) - run_id, schemas = await activity_environment.run(create_external_data_job_model, inputs) + run_id = await activity_environment.run(create_external_data_job_model_activity, inputs) runs = ExternalDataJob.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() @@ -241,8 +232,18 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs source_type="Stripe", ) + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name=PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type][0], + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + new_job = await sync_to_async(create_external_data_job)( - team_id=team.id, external_data_source_id=new_source.pk, workflow_id=activity_environment.info.workflow_id + team_id=team.id, + external_data_source_id=new_source.pk, + workflow_id=activity_environment.info.workflow_id, + external_data_schema_id=schema.id, ) inputs = UpdateExternalDataJobStatusInputs( @@ -255,8 +256,10 @@ async def test_update_external_job_activity(activity_environment, team, **kwargs await activity_environment.run(update_external_data_job_model, inputs) await sync_to_async(new_job.refresh_from_db)() + await sync_to_async(schema.refresh_from_db)() assert new_job.status == ExternalDataJob.Status.COMPLETED + assert schema.status == ExternalDataJob.Status.COMPLETED @pytest.mark.django_db(transaction=True) @@ -283,13 +286,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -314,14 +316,13 @@ async def setup_job_2(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() - customer_schema = await _create_schema("Customer", new_source, team) invoice_schema = await _create_schema("Invoice", new_source, team) - schemas = [(customer_schema.id, "Customer"), (invoice_schema.id, "Invoice")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=invoice_schema.id, ) return new_job, inputs @@ -356,26 +357,18 @@ async def setup_job_2(): "has_more": False, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), - activity_environment.run(run_external_data_job, job_2_inputs), + activity_environment.run(import_data_activity, job_1_inputs), + activity_environment.run(import_data_activity, job_2_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/customer/" ) - job_1_invoice_objects = await minio_client.list_objects_v2( - Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/invoice/" - ) assert len(job_1_customer_objects["Contents"]) == 1 - assert job_1_invoice_objects.get("Contents", None) is None - job_2_customer_objects = await minio_client.list_objects_v2( - Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/customer/" - ) job_2_invoice_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path}/invoice/" ) - assert len(job_2_customer_objects["Contents"]) == 1 assert len(job_2_invoice_objects["Contents"]) == 1 @@ -405,12 +398,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -432,7 +425,7 @@ async def setup_job_1(): "has_more": True, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( @@ -470,12 +463,12 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() customer_schema = await _create_schema("Customer", new_source, team) - schemas = [(customer_schema.id, "Customer")] - inputs = ExternalDataJobInputs( + + inputs = ImportDataActivityInputs( team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, - schemas=schemas, + schema_id=customer_schema.id, ) return new_job, inputs @@ -499,7 +492,7 @@ async def setup_job_1(): "has_more": False, } await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_customer_objects = await minio_client.list_objects_v2( @@ -533,17 +526,6 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t ) test_1_schema = await _create_schema("test-1", new_source, team) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -554,21 +536,17 @@ async def test_validate_schema_and_update_table_activity(activity_environment, t ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 ) @@ -618,17 +596,6 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_ ) test_1_schema = await _create_schema("test-1", new_source, team, table_id=existing_table.id) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -639,21 +606,17 @@ async def test_validate_schema_and_update_table_activity_with_existing(activity_ ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 5 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 ) @@ -699,34 +662,27 @@ async def test_validate_schema_and_update_table_activity_half_run(activity_envir ] broken_schema = await _create_schema("broken_schema", new_source, team) - test_schema = await _create_schema("test_schema", new_source, team) - schemas = [(broken_schema.id, "broken_schema"), (test_schema.id, "test_schema")] await activity_environment.run( validate_schema_activity, ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=broken_schema.id, table_schema={ "broken_schema": { "name": "broken_schema", "resource": "broken_schema", "columns": {"id": {"data_type": "text"}}, }, - "test_schema": { - "name": "test_schema", - "resource": "test_schema", - "columns": {"id": {"data_type": "text"}}, - }, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 1 + assert mock_get_columns.call_count == 0 assert ( - await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 + await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 0 ) @@ -751,17 +707,6 @@ async def test_create_schema_activity(activity_environment, team, **kwargs): ) test_1_schema = await _create_schema("test-1", new_source, team) - test_2_schema = await _create_schema("test-2", new_source, team) - test_3_schema = await _create_schema("test-3", new_source, team) - test_4_schema = await _create_schema("test-4", new_source, team) - test_5_schema = await _create_schema("test-5", new_source, team) - schemas = [ - (test_1_schema.id, "test-1"), - (test_2_schema.id, "test-2"), - (test_3_schema.id, "test-3"), - (test_4_schema.id, "test-4"), - (test_5_schema.id, "test-5"), - ] with mock.patch( "posthog.warehouse.models.table.DataWarehouseTable.get_columns" @@ -772,30 +717,25 @@ async def test_create_schema_activity(activity_environment, team, **kwargs): ValidateSchemaInputs( run_id=new_job.pk, team_id=team.id, - schemas=schemas, + schema_id=test_1_schema.id, table_schema={ "test-1": {"name": "test-1", "resource": "test-1", "columns": {"id": {"data_type": "text"}}}, - "test-2": {"name": "test-2", "resource": "test-2", "columns": {"id": {"data_type": "text"}}}, - "test-3": {"name": "test-3", "resource": "test-3", "columns": {"id": {"data_type": "text"}}}, - "test-4": {"name": "test-4", "resource": "test-4", "columns": {"id": {"data_type": "text"}}}, - "test-5": {"name": "test-5", "resource": "test-5", "columns": {"id": {"data_type": "text"}}}, }, table_row_counts={}, ), ) - assert mock_get_columns.call_count == 10 + assert mock_get_columns.call_count == 2 all_tables = DataWarehouseTable.objects.all() table_length = await sync_to_async(len)(all_tables) - assert table_length == 5 + assert table_length == 1 @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio -async def test_external_data_job_workflow_blank(team, **kwargs): +async def test_external_data_job_workflow_with_schema(team, **kwargs): """ - Test workflow with no schema. - Smoke test for making sure all activities run. + Test workflow with schema. """ new_source = await sync_to_async(ExternalDataSource.objects.create)( source_id=uuid.uuid4(), @@ -807,71 +747,19 @@ async def test_external_data_job_workflow_blank(team, **kwargs): job_inputs={"stripe_secret_key": "test-key"}, ) - workflow_id = str(uuid.uuid4()) - inputs = ExternalDataWorkflowInputs( + schema = await sync_to_async(ExternalDataSchema.objects.create)( + name="Customer", team_id=team.id, - external_data_source_id=new_source.pk, - ) - - with override_settings(AIRBYTE_BUCKET_KEY="test-key", AIRBYTE_BUCKET_SECRET="test-secret"): - with mock.patch.dict(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, {ExternalDataSource.Type.STRIPE: ()}): - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, - workflows=[ExternalDataJobWorkflow], - activities=[ - create_external_data_job_model, - update_external_data_job_model, - run_external_data_job, - validate_schema_activity, - create_source_templates, - ], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - await activity_environment.client.execute_workflow( - ExternalDataJobWorkflow.run, - inputs, - id=workflow_id, - task_queue=DATA_WAREHOUSE_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - ) - - run = await get_latest_run_if_exists(team_id=team.pk, pipeline_id=new_source.pk) - assert run is not None - assert run.status == ExternalDataJob.Status.COMPLETED - - -@pytest.mark.django_db(transaction=True) -@pytest.mark.asyncio -async def test_external_data_job_workflow_with_schema(team, **kwargs): - """ - Test workflow with schema. - """ - new_source = await sync_to_async(ExternalDataSource.objects.create)( - source_id=uuid.uuid4(), - connection_id=uuid.uuid4(), - destination_id=uuid.uuid4(), - team=team, - status="running", - source_type="Stripe", - job_inputs={"stripe_secret_key": "test-key"}, + source_id=new_source.pk, ) workflow_id = str(uuid.uuid4()) inputs = ExternalDataWorkflowInputs( team_id=team.id, external_data_source_id=new_source.pk, + external_data_schema_id=schema.id, ) - schemas = PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] - for schema in schemas: - await sync_to_async(ExternalDataSchema.objects.create)( - name=schema, - team_id=team.id, - source_id=new_source.pk, - ) - async def mock_async_func(inputs): return {} @@ -885,9 +773,10 @@ async def mock_async_func(inputs): task_queue=DATA_WAREHOUSE_TASK_QUEUE, workflows=[ExternalDataJobWorkflow], activities=[ - create_external_data_job_model, + check_schedule_activity, + create_external_data_job_model_activity, update_external_data_job_model, - run_external_data_job, + import_data_activity, validate_schema_activity, create_source_templates, ], @@ -906,9 +795,7 @@ async def mock_async_func(inputs): assert run is not None assert run.status == ExternalDataJob.Status.COMPLETED - assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == len( - PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type] - ) + assert await sync_to_async(DataWarehouseTable.objects.filter(external_data_source_id=new_source.pk).count)() == 1 @pytest.mark.django_db(transaction=True) @@ -952,12 +839,9 @@ async def setup_job_1(): new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() posthog_test_schema = await _create_schema("posthog_test", new_source, team) - schemas = [(posthog_test_schema.id, "posthog_test")] - inputs = ExternalDataJobInputs( - team_id=team.id, - run_id=new_job.pk, - source_id=new_source.pk, - schemas=schemas, + + inputs = ImportDataActivityInputs( + team_id=team.id, run_id=new_job.pk, source_id=new_source.pk, schema_id=posthog_test_schema.id ) return new_job, inputs @@ -970,7 +854,7 @@ async def setup_job_1(): AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, ): await asyncio.gather( - activity_environment.run(run_external_data_job, job_1_inputs), + activity_environment.run(import_data_activity, job_1_inputs), ) job_1_team_objects = await minio_client.list_objects_v2( diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index bc39b96359e76..385aaa0d1664a 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -61,7 +61,7 @@ def get_active_schemas_for_source_id(source_id: uuid.UUID, team_id: int): def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): - return ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).all() + return list(ExternalDataSchema.objects.filter(team_id=team_id, source_id=source_id).all()) def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int): From 6a4fc4880dbbfc1a87470096b52f4517ff48598d Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 15:13:13 +0100 Subject: [PATCH 05/26] Added new tests to cover check_schedule_activity --- .../external_data/test_external_data_job.py | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index b3907d2f3fa0d..44470724a9c5f 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -861,3 +861,110 @@ async def setup_job_1(): Bucket=BUCKET_NAME, Prefix=f"{job_1.folder_path}/posthog_test/" ) assert len(job_1_team_objects["Contents"]) == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_schema_id(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + test_1_schema = await _create_schema("test-1", new_source, team) + + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=test_1_schema.id, + ), + ) + + assert should_exit is False + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_missing_schema_id_but_with_schedule(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + await sync_to_async(ExternalDataSchema.objects.create)( + name="test-1", + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + + with mock.patch( + "posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_trigger_external_data_workflow" + ) as mock_a_trigger_external_data_workflow: + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=None, + ), + ) + + assert should_exit is True + assert mock_a_trigger_external_data_workflow.call_count == 1 + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_check_schedule_activity_with_missing_schema_id_and_no_schedule(activity_environment, team, **kwargs): + new_source = await sync_to_async(ExternalDataSource.objects.create)( + source_id=uuid.uuid4(), + connection_id=uuid.uuid4(), + destination_id=uuid.uuid4(), + team=team, + status="running", + source_type="Stripe", + job_inputs={"stripe_secret_key": "test-key"}, + ) + + await sync_to_async(ExternalDataSchema.objects.create)( + name="test-1", + team_id=team.id, + source_id=new_source.pk, + should_sync=True, + ) + + with mock.patch( + "posthog.temporal.data_imports.external_data_job.a_external_data_workflow_exists", return_value=False + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_delete_external_data_schedule", return_value=True + ), mock.patch( + "posthog.temporal.data_imports.external_data_job.a_sync_external_data_job_workflow" + ) as mock_a_sync_external_data_job_workflow: + should_exit = await activity_environment.run( + check_schedule_activity, + ExternalDataWorkflowInputs( + team_id=team.id, + external_data_source_id=new_source.id, + external_data_schema_id=None, + ), + ) + + assert should_exit is True + assert mock_a_sync_external_data_job_workflow.call_count == 1 From e92c9a0dc74c47597cf2ceae7cbe63480a9e4484 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 15:32:28 +0100 Subject: [PATCH 06/26] Updated the source API to trigger active schemas --- posthog/warehouse/api/external_data_source.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index bd181390420ff..7a2cd91d1a5b1 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -192,13 +192,20 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: disabled_schemas = [schema for schema in default_schemas if schema not in enabled_schemas] + active_schemas: List[ExternalDataSchema] = [] + for schema in enabled_schemas: - ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=True) + active_schemas.append( + ExternalDataSchema.objects.create( + name=schema, team=self.team, source=new_source_model, should_sync=True + ) + ) for schema in disabled_schemas: ExternalDataSchema.objects.create(name=schema, team=self.team, source=new_source_model, should_sync=False) try: - sync_external_data_job_workflow(new_source_model, create=True) + for active_schema in active_schemas: + sync_external_data_job_workflow(active_schema, create=True) except Exception as e: # Log error but don't fail because the source model was already created logger.exception("Could not trigger external data job", exc_info=e) From d389cc7a735748d0b3000681442b842bf1f0e750 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 15:36:34 +0100 Subject: [PATCH 07/26] Added master changes for stripe source --- .../workflow_activities/import_data.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 98ac8d60fa468..9c78555dfe93c 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -1,4 +1,5 @@ import dataclasses +import datetime as dt import uuid from dlt.common.schema.typing import TSchemaTables @@ -9,6 +10,7 @@ from posthog.temporal.data_imports.pipelines.zendesk.credentials import ZendeskCredentialsToken from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs +from posthog.utils import get_instance_region from posthog.warehouse.models import ( ExternalDataJob, ExternalDataSource, @@ -18,6 +20,7 @@ from typing import Dict, Tuple import asyncio from django.conf import settings +from django.utils import timezone from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id @@ -61,12 +64,25 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem # until we require re update of account_ids in stripe so they're all store if not stripe_secret_key: raise ValueError(f"Stripe secret key not found for job {model.id}") + + # Hacky just for specific user + region = get_instance_region() + if region == "EU" and inputs.team_id == 11870: + prev_day = timezone.now() - dt.timedelta(days=1) + start_date = prev_day.replace(hour=0, minute=0, second=0, microsecond=0) + end_date = start_date + dt.timedelta(1) + else: + start_date = None + end_date = None + source = stripe_source( api_key=stripe_secret_key, account_id=account_id, endpoints=tuple(endpoints), team_id=inputs.team_id, job_id=inputs.run_id, + start_date=start_date, + end_date=end_date, ) elif model.pipeline.source_type == ExternalDataSource.Type.HUBSPOT: from posthog.temporal.data_imports.pipelines.hubspot.auth import refresh_access_token From 5106718eab75feed5797aefc397737a07339c9d2 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Fri, 12 Apr 2024 15:43:09 +0100 Subject: [PATCH 08/26] Updated mypy --- mypy-baseline.txt | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index a82ceecb185ce..4a34b1bf31a1f 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -2,7 +2,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" h posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment] -posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment] posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance posthog/hogql/database/argmax.py:0: note: Consider using "Sequence" instead, which is covariant @@ -138,6 +137,7 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict ent posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/feature_flag.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "email" [union-attr] +posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment] posthog/hogql/functions/cohort.py:0: error: Argument 1 to "escape_clickhouse_string" has incompatible type "str | None"; expected "float | int | str | list[Any] | tuple[Any, ...] | date | datetime | UUID | UUIDT" [arg-type] posthog/hogql/functions/cohort.py:0: error: Argument 1 to "escape_clickhouse_string" has incompatible type "str | None"; expected "float | int | str | list[Any] | tuple[Any, ...] | date | datetime | UUID | UUIDT" [arg-type] posthog/hogql/functions/cohort.py:0: error: Incompatible types in assignment (expression has type "ValuesQuerySet[Cohort, tuple[int, bool | None]]", variable has type "ValuesQuerySet[Cohort, tuple[int, bool | None, str | None]]") [assignment] @@ -174,13 +174,6 @@ posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item " posthog/hogql_queries/insights/trends/aggregation_operations.py:0: error: Item "None" of "list[Expr] | Any | None" has no attribute "append" [union-attr] ee/billing/billing_manager.py:0: error: TypedDict "CustomerInfo" has no key "available_product_features" [typeddict-item] ee/billing/billing_manager.py:0: note: Did you mean "available_features"? -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr] -posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] posthog/hogql/resolver.py:0: error: Argument 1 of "visit" is incompatible with supertype "Visitor"; supertype defines the argument type as "AST" [override] posthog/hogql/resolver.py:0: note: This violates the Liskov substitution principle posthog/hogql/resolver.py:0: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides @@ -240,9 +233,6 @@ posthog/hogql/resolver.py:0: error: Argument 1 to "get_child" of "Type" has inco posthog/hogql/resolver.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "Alias") [assignment] posthog/hogql/resolver.py:0: error: Argument "alias" to "Alias" has incompatible type "str | int"; expected "str" [arg-type] posthog/hogql/resolver.py:0: error: Argument 1 to "join" of "str" has incompatible type "list[str | int]"; expected "Iterable[str]" [arg-type] -posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type] -posthog/temporal/data_imports/external_data_job.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/external_data_job.py:0: error: Argument "team_id" has incompatible type "int"; expected "str" [arg-type] posthog/hogql/transforms/lazy_tables.py:0: error: Incompatible default for argument "context" (default has type "None", argument has type "HogQLContext") [assignment] posthog/hogql/transforms/lazy_tables.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True posthog/hogql/transforms/lazy_tables.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase @@ -588,6 +578,15 @@ posthog/hogql/database/schema/event_sessions.py:0: error: Statement is unreachab posthog/api/organization_member.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] ee/api/role.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] ee/clickhouse/views/insights.py:0: error: Metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases [misc] +posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Argument 6 has incompatible type "ExternalDataSchema"; expected "str" [arg-type] +posthog/temporal/data_imports/workflow_activities/create_job_model.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "DateTime | Date | datetime | date | str | float | int | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Item "None" of "DateTime | None" has no attribute "int_timestamp" [union-attr] +posthog/temporal/data_imports/pipelines/zendesk/helpers.py:0: error: Argument 1 to "ensure_pendulum_datetime" has incompatible type "str | None"; expected "DateTime | Date | datetime | date | str | float | int" [arg-type] posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined] posthog/queries/trends/test/test_person.py:0: error: Invalid index type "int" for "HttpResponse"; expected type "str | bytes" [index] posthog/queries/trends/test/test_person.py:0: error: "str" has no attribute "get" [attr-defined] From 6e26830dbade62697280bf1bbf28137b1b49c9f0 Mon Sep 17 00:00:00 2001 From: eric Date: Fri, 12 Apr 2024 14:43:08 -0400 Subject: [PATCH 09/26] add blank to field --- posthog/migrations/0401_externaldatajob_schema.py | 2 +- posthog/warehouse/models/external_data_job.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/posthog/migrations/0401_externaldatajob_schema.py b/posthog/migrations/0401_externaldatajob_schema.py index 09ce3b5f86874..183004299ce1c 100644 --- a/posthog/migrations/0401_externaldatajob_schema.py +++ b/posthog/migrations/0401_externaldatajob_schema.py @@ -20,6 +20,6 @@ class Migration(migrations.Migration): migrations.AddField( model_name="externaldataschema", name="status", - field=models.CharField(max_length=400, null=True), + field=models.CharField(max_length=400, null=True, blank=True), ), ] diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index c7ca3b83933af..b095f8667d934 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -16,7 +16,9 @@ class Status(models.TextChoices): team: models.ForeignKey = models.ForeignKey(Team, on_delete=models.CASCADE) pipeline: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSource", on_delete=models.CASCADE) - schema: models.ForeignKey = models.ForeignKey("posthog.ExternalDataSchema", on_delete=models.CASCADE, null=True) + schema: models.ForeignKey = models.ForeignKey( + "posthog.ExternalDataSchema", on_delete=models.CASCADE, null=True, blank=True + ) status: models.CharField = models.CharField(max_length=400) rows_synced: models.BigIntegerField = models.BigIntegerField(null=True, blank=True) latest_error: models.TextField = models.TextField( From 37251b95e0c92eef38e7b8ceb4d44b78fd183e5e Mon Sep 17 00:00:00 2001 From: eric Date: Mon, 15 Apr 2024 10:33:27 -0400 Subject: [PATCH 10/26] update migrations --- posthog/migrations/0401_externaldatajob_schema.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/migrations/0401_externaldatajob_schema.py b/posthog/migrations/0401_externaldatajob_schema.py index 183004299ce1c..5ea17064fc97d 100644 --- a/posthog/migrations/0401_externaldatajob_schema.py +++ b/posthog/migrations/0401_externaldatajob_schema.py @@ -1,4 +1,4 @@ -# Generated by Django 4.1.13 on 2024-04-10 15:12 +# Generated by Django 4.1.13 on 2024-04-15 14:32 from django.db import migrations, models import django.db.models.deletion @@ -14,12 +14,12 @@ class Migration(migrations.Migration): model_name="externaldatajob", name="schema", field=models.ForeignKey( - null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema" + blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to="posthog.externaldataschema" ), ), migrations.AddField( model_name="externaldataschema", name="status", - field=models.CharField(max_length=400, null=True, blank=True), + field=models.CharField(max_length=400, null=True), ), ] From 18d7547e9f164ef2d621735c1ca531815161c416 Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 15:34:27 -0400 Subject: [PATCH 11/26] update mypy --- mypy-baseline.txt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index ef676831164d6..49a9da6e550e7 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -1,6 +1,7 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" has incompatible type "Callable[[HeartbeatDetails, Any], Any]"; expected "Callable[[type[Never], Any], Any]" [arg-type] posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] +posthog/temporal/data_imports/pipelines/postgres/helpers.py:0: error: Attributes without a default cannot follow attributes with one [misc] posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment] posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance @@ -131,8 +132,9 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Argument posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Argument 1 to "to_base_entity_dict" has incompatible type "Any | None"; expected "dict[Any, Any]" [arg-type] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "PathsFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item] -posthog/hogql_queries/legacy_mycompatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item] +posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/feature_flag.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "email" [union-attr] +posthog/hogql/modifiers.py:0: error: Incompatible types in assignment (expression has type "PersonOnEventsMode", variable has type "PersonsOnEventsMode | None") [assignment] posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment] posthog/api/utils.py:0: error: Argument 1 to "UUID" has incompatible type "int | str"; expected "str | None" [arg-type] ee/billing/quota_limiting.py:0: error: List comprehension has incompatible type List[int]; expected List[str] [misc] @@ -699,6 +701,7 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] +posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Missing positional arguments "subdomain", "email", "token" in call to "ZendeskCredentialsToken" [call-arg] posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment] posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type] posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value] From b7d60ac0cea62d078c894c288e675b66d58853c8 Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 16:26:27 -0400 Subject: [PATCH 12/26] fix tpyes --- mypy-baseline.txt | 2 -- .../temporal/data_imports/pipelines/postgres/helpers.py | 2 +- .../data_imports/workflow_activities/import_data.py | 9 +++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 49a9da6e550e7..713cf17200a43 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -1,7 +1,6 @@ posthog/temporal/common/utils.py:0: error: Argument 1 to "abstractclassmethod" has incompatible type "Callable[[HeartbeatDetails, Any], Any]"; expected "Callable[[type[Never], Any], Any]" [arg-type] posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" has named arguments: "cls". Consider marking them positional-only posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] -posthog/temporal/data_imports/pipelines/postgres/helpers.py:0: error: Attributes without a default cannot follow attributes with one [misc] posthog/temporal/data_imports/pipelines/zendesk/talk_api.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "str") [assignment] posthog/hogql/database/argmax.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/database/argmax.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance @@ -701,7 +700,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: error: List item 0 has incompatible type "tuple[str, str, int, int, int, int, str, int]"; expected "tuple[str, str, int, int, str, str, str, str]" [list-item] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] -posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Missing positional arguments "subdomain", "email", "token" in call to "ZendeskCredentialsToken" [call-arg] posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment] posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type] posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value] diff --git a/posthog/temporal/data_imports/pipelines/postgres/helpers.py b/posthog/temporal/data_imports/pipelines/postgres/helpers.py index 7d45a6df7e302..a288205063f15 100644 --- a/posthog/temporal/data_imports/pipelines/postgres/helpers.py +++ b/posthog/temporal/data_imports/pipelines/postgres/helpers.py @@ -117,8 +117,8 @@ class SqlDatabaseTableConfiguration(BaseConfiguration): class SqlTableResourceConfiguration(BaseConfiguration): credentials: ConnectionStringCredentials table: str - incremental: Optional[dlt.sources.incremental] = None schema: Optional[str] + incremental: Optional[dlt.sources.incremental] = None __source_name__ = "sql_database" diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 9c78555dfe93c..3c1f3ac3214c9 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -124,10 +124,11 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - credentials = ZendeskCredentialsToken() - credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") - credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") - credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") + credentials = ZendeskCredentialsToken( + token=model.pipeline.job_inputs.get("zendesk_api_key"), + subdomain=model.pipeline.job_inputs.get("zendesk_subdomain"), + email=model.pipeline.job_inputs.get("zendesk_email_address"), + ) data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) # Uncomment to support zendesk chat and talk From 9d6d4087c6cc1bd05114a261981e46d46d38c263 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 16 Apr 2024 20:37:16 +0000 Subject: [PATCH 13/26] Update query snapshots --- .../api/test/__snapshots__/test_decide.ambr | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_decide.ambr b/posthog/api/test/__snapshots__/test_decide.ambr index b60f0660121b9..ae8f9966d1136 100644 --- a/posthog/api/test/__snapshots__/test_decide.ambr +++ b/posthog/api/test/__snapshots__/test_decide.ambr @@ -142,6 +142,17 @@ ''' # --- # name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.3 + ''' + SELECT "posthog_instancesetting"."id", + "posthog_instancesetting"."key", + "posthog_instancesetting"."raw_value" + FROM "posthog_instancesetting" + WHERE "posthog_instancesetting"."key" = 'constance:posthog:RATE_LIMIT_ENABLED' + ORDER BY "posthog_instancesetting"."id" ASC + LIMIT 1 + ''' +# --- +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.4 ''' SELECT 1 AS "a" FROM "posthog_grouptypemapping" @@ -149,7 +160,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.4 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.5 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -160,7 +171,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.5 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.6 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -171,7 +182,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.6 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.7 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -182,7 +193,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.7 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.8 ''' SELECT "posthog_user"."id", "posthog_user"."password", @@ -213,7 +224,7 @@ LIMIT 21 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.8 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.9 ''' SELECT "posthog_featureflag"."id", "posthog_featureflag"."key", @@ -236,22 +247,6 @@ AND "posthog_featureflag"."team_id" = 2) ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.9 - ''' - SELECT "posthog_pluginconfig"."id", - "posthog_pluginconfig"."web_token", - "posthog_pluginsourcefile"."updated_at", - "posthog_plugin"."updated_at", - "posthog_pluginconfig"."updated_at" - FROM "posthog_pluginconfig" - INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id") - INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id") - WHERE ("posthog_pluginconfig"."enabled" - AND "posthog_pluginsourcefile"."filename" = 'site.ts' - AND "posthog_pluginsourcefile"."status" = 'TRANSPILED' - AND "posthog_pluginconfig"."team_id" = 2) - ''' -# --- # name: TestDecide.test_flag_with_behavioural_cohorts ''' SELECT "posthog_user"."id", From d5e6797ecdf2b971ad5ff9479254a580a55a7949 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 16 Apr 2024 20:57:55 +0000 Subject: [PATCH 14/26] Update query snapshots --- .../api/test/__snapshots__/test_decide.ambr | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_decide.ambr b/posthog/api/test/__snapshots__/test_decide.ambr index ae8f9966d1136..b60f0660121b9 100644 --- a/posthog/api/test/__snapshots__/test_decide.ambr +++ b/posthog/api/test/__snapshots__/test_decide.ambr @@ -142,17 +142,6 @@ ''' # --- # name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.3 - ''' - SELECT "posthog_instancesetting"."id", - "posthog_instancesetting"."key", - "posthog_instancesetting"."raw_value" - FROM "posthog_instancesetting" - WHERE "posthog_instancesetting"."key" = 'constance:posthog:RATE_LIMIT_ENABLED' - ORDER BY "posthog_instancesetting"."id" ASC - LIMIT 1 - ''' -# --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.4 ''' SELECT 1 AS "a" FROM "posthog_grouptypemapping" @@ -160,7 +149,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.5 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.4 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -171,7 +160,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.6 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.5 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -182,7 +171,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.7 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.6 ''' SELECT "posthog_instancesetting"."id", "posthog_instancesetting"."key", @@ -193,7 +182,7 @@ LIMIT 1 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.8 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.7 ''' SELECT "posthog_user"."id", "posthog_user"."password", @@ -224,7 +213,7 @@ LIMIT 21 ''' # --- -# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.9 +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.8 ''' SELECT "posthog_featureflag"."id", "posthog_featureflag"."key", @@ -247,6 +236,22 @@ AND "posthog_featureflag"."team_id" = 2) ''' # --- +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.9 + ''' + SELECT "posthog_pluginconfig"."id", + "posthog_pluginconfig"."web_token", + "posthog_pluginsourcefile"."updated_at", + "posthog_plugin"."updated_at", + "posthog_pluginconfig"."updated_at" + FROM "posthog_pluginconfig" + INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id") + INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id") + WHERE ("posthog_pluginconfig"."enabled" + AND "posthog_pluginsourcefile"."filename" = 'site.ts' + AND "posthog_pluginsourcefile"."status" = 'TRANSPILED' + AND "posthog_pluginconfig"."team_id" = 2) + ''' +# --- # name: TestDecide.test_flag_with_behavioural_cohorts ''' SELECT "posthog_user"."id", From 1af7773ea265cf7e89c0148ab09ffadb11a24449 Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 17:06:48 -0400 Subject: [PATCH 15/26] fix types --- .../data_imports/workflow_activities/import_data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 3c1f3ac3214c9..5872edf4f5777 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -125,9 +125,9 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support credentials = ZendeskCredentialsToken( - token=model.pipeline.job_inputs.get("zendesk_api_key"), - subdomain=model.pipeline.job_inputs.get("zendesk_subdomain"), - email=model.pipeline.job_inputs.get("zendesk_email_address"), + model.pipeline.job_inputs.get("zendesk_subdomain"), + model.pipeline.job_inputs.get("zendesk_email_address"), + model.pipeline.job_inputs.get("zendesk_api_key"), ) data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) From 3a283b12bed2109ca0870fc7c34275b3ad1b2cff Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 17:14:57 -0400 Subject: [PATCH 16/26] update mypy --- .../data_imports/workflow_activities/import_data.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 5872edf4f5777..9c78555dfe93c 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -124,11 +124,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - credentials = ZendeskCredentialsToken( - model.pipeline.job_inputs.get("zendesk_subdomain"), - model.pipeline.job_inputs.get("zendesk_email_address"), - model.pipeline.job_inputs.get("zendesk_api_key"), - ) + credentials = ZendeskCredentialsToken() + credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") + credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") + credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") data_support = zendesk_support(credentials=credentials, endpoints=tuple(endpoints), team_id=inputs.team_id) # Uncomment to support zendesk chat and talk From 180f8ab4d67b8c8771c87d7a4b633a8b592edcf9 Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 20:09:27 -0400 Subject: [PATCH 17/26] type ignore --- .../temporal/data_imports/workflow_activities/import_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 9c78555dfe93c..7fe58877f6067 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -124,7 +124,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - credentials = ZendeskCredentialsToken() + credentials = ZendeskCredentialsToken() # type: ignore credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") From b353b0897cadb696c0bac1f8406258d1b8d8742b Mon Sep 17 00:00:00 2001 From: eric Date: Tue, 16 Apr 2024 20:16:24 -0400 Subject: [PATCH 18/26] add comment --- .../temporal/data_imports/workflow_activities/import_data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 7fe58877f6067..bd2a9d4cf85ba 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -124,7 +124,8 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> Tuple[TSchem elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK: from posthog.temporal.data_imports.pipelines.zendesk.helpers import zendesk_support - credentials = ZendeskCredentialsToken() # type: ignore + # NOTE: this line errors on CI mypy but not locally. Putting arguments within the function causes the opposite error + credentials = ZendeskCredentialsToken() credentials.token = model.pipeline.job_inputs.get("zendesk_api_key") credentials.subdomain = model.pipeline.job_inputs.get("zendesk_subdomain") credentials.email = model.pipeline.job_inputs.get("zendesk_email_address") From 1925606c2b9db460b44715a1fd1ba2c862f04740 Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 11:30:27 -0400 Subject: [PATCH 19/26] add default args, fix missing schema sync creation, add deletion logic --- .../pipelines/zendesk/credentials.py | 12 ++++++------ posthog/warehouse/api/external_data_source.py | 17 ++++++++++++----- posthog/warehouse/data_load/service.py | 4 ++-- posthog/warehouse/data_load/validate_schema.py | 4 ++-- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py index 1f8110ae9b911..c3fadade0e470 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py @@ -13,7 +13,7 @@ class ZendeskCredentialsBase(CredentialsConfiguration): The Base version of all the ZendeskCredential classes. """ - subdomain: str + subdomain: str = "" __config_gen_annotations__: ClassVar[List[str]] = [] @@ -23,8 +23,8 @@ class ZendeskCredentialsEmailPass(ZendeskCredentialsBase): This class is used to store credentials for Email + Password Authentication """ - email: str - password: TSecretValue + email: str = "" + password: TSecretValue = "" @configspec @@ -33,7 +33,7 @@ class ZendeskCredentialsOAuth(ZendeskCredentialsBase): This class is used to store credentials for OAuth Token Authentication """ - oauth_token: TSecretValue + oauth_token: TSecretValue = "" @configspec @@ -42,8 +42,8 @@ class ZendeskCredentialsToken(ZendeskCredentialsBase): This class is used to store credentials for Token Authentication """ - email: str - token: TSecretValue + email: str = "" + token: TSecretValue = "" TZendeskCredentials = Union[ZendeskCredentialsEmailPass, ZendeskCredentialsToken, ZendeskCredentialsOAuth] diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 7a2cd91d1a5b1..2577ae88e4233 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -361,7 +361,12 @@ def destroy(self, request: Request, *args: Any, **kwargs: Any) -> Response: ) pass - delete_external_data_schedule(instance) + for schema in ExternalDataSchema.objects.filter( + team_id=self.team_id, source_id=instance.id, should_sync=True + ).all(): + delete_external_data_schedule(str(schema.id)) + + delete_external_data_schedule(str(instance.id)) return super().destroy(request, *args, **kwargs) @action(methods=["POST"], detail=True) @@ -383,11 +388,13 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): for schema in ExternalDataSchema.objects.filter( team_id=self.team_id, source_id=instance.id, should_sync=True ).all(): - trigger_external_data_workflow(schema) + try: + trigger_external_data_workflow(schema) + except temporalio.service.RPCError as e: + # schedule doesn't exist + if e.message == "sql: no rows in result set": + sync_external_data_job_workflow(schema, create=True) - # schedule doesn't exist - if e.message == "sql: no rows in result set": - sync_external_data_job_workflow(instance, create=True) except Exception as e: logger.exception("Could not trigger external data job", exc_info=e) raise diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index b5969303622bf..fc76018734597 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -120,10 +120,10 @@ def unpause_external_data_schedule(external_data_source: ExternalDataSource): unpause_schedule(temporal, schedule_id=str(external_data_source.id)) -def delete_external_data_schedule(external_data_source: ExternalDataSource): +def delete_external_data_schedule(schedule_id: str): temporal = sync_connect() try: - delete_schedule(temporal, schedule_id=str(external_data_source.id)) + delete_schedule(temporal, schedule_id=schedule_id) except temporalio.service.RPCError as e: # Swallow error if schedule does not exist already if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 052bb2721b7d9..dcfbb69595aa3 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -124,11 +124,11 @@ async def validate_schema_and_update_table( external_data_schema: ExternalDataSchema = await aget_schema_by_id(schema_id, team_id) _schema_id = external_data_schema.id - _schema_name = external_data_schema.name + _schema_name: str = external_data_schema.name table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() new_url_pattern = job.url_pattern_by_schema(camel_to_snake_case(_schema_name)) - row_count = table_row_counts.get(_schema_name, 0) + row_count = table_row_counts.get(_schema_name.lower(), 0) # Check try: From 45d0eda0aeb00a4d34ec27a4855d39b6947ffcdb Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 12:25:58 -0400 Subject: [PATCH 20/26] remove defaults --- .../temporal/data_imports/pipelines/zendesk/credentials.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py index c3fadade0e470..e4dfda2013573 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/credentials.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/credentials.py @@ -24,7 +24,7 @@ class ZendeskCredentialsEmailPass(ZendeskCredentialsBase): """ email: str = "" - password: TSecretValue = "" + password: TSecretValue @configspec @@ -33,7 +33,7 @@ class ZendeskCredentialsOAuth(ZendeskCredentialsBase): This class is used to store credentials for OAuth Token Authentication """ - oauth_token: TSecretValue = "" + oauth_token: TSecretValue @configspec @@ -43,7 +43,7 @@ class ZendeskCredentialsToken(ZendeskCredentialsBase): """ email: str = "" - token: TSecretValue = "" + token: TSecretValue TZendeskCredentials = Union[ZendeskCredentialsEmailPass, ZendeskCredentialsToken, ZendeskCredentialsOAuth] From 1e8b4c51d7a17080a8b39f9ed6b830f1b08a227d Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 12:28:41 -0400 Subject: [PATCH 21/26] add blank --- posthog/migrations/0402_externaldatajob_schema.py | 2 +- posthog/warehouse/models/external_data_schema.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/migrations/0402_externaldatajob_schema.py b/posthog/migrations/0402_externaldatajob_schema.py index 7800b1137e005..93cd399aec2aa 100644 --- a/posthog/migrations/0402_externaldatajob_schema.py +++ b/posthog/migrations/0402_externaldatajob_schema.py @@ -20,6 +20,6 @@ class Migration(migrations.Migration): migrations.AddField( model_name="externaldataschema", name="status", - field=models.CharField(max_length=400, null=True), + field=models.CharField(max_length=400, null=True, blank=True), ), ] diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 385aaa0d1664a..add9350230593 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -29,7 +29,7 @@ class Status(models.TextChoices): latest_error: models.TextField = models.TextField( null=True, help_text="The latest error that occurred when syncing this schema." ) - status: models.CharField = models.CharField(max_length=400, null=True) + status: models.CharField = models.CharField(max_length=400, null=True, blank=True) last_synced_at: models.DateTimeField = models.DateTimeField(null=True, blank=True) __repr__ = sane_repr("name") From b7e75dd4611f36e1a614c739cbb9c97a1a86e0b3 Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 14:34:05 -0400 Subject: [PATCH 22/26] cleanup --- .../data-warehouse/new/NewSourceWizard.tsx | 16 +++++++--------- .../data-warehouse/new/sourceWizardLogic.tsx | 1 + 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx b/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx index 48085faa831ef..3c61d2705ae0e 100644 --- a/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx +++ b/frontend/src/scenes/data-warehouse/new/NewSourceWizard.tsx @@ -1,6 +1,5 @@ import { LemonButton } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' -import { router } from 'kea-router' import { PageHeader } from 'lib/components/PageHeader' import { FEATURE_FLAGS } from 'lib/constants' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' @@ -10,7 +9,6 @@ import stripeLogo from 'public/stripe-logo.svg' import zendeskLogo from 'public/zendesk-logo.png' import { useCallback } from 'react' import { SceneExport } from 'scenes/sceneTypes' -import { urls } from 'scenes/urls' import { SourceConfig } from '~/types' @@ -26,7 +24,7 @@ export const scene: SceneExport = { } export function NewSourceWizard(): JSX.Element { const { modalTitle, modalCaption } = useValues(sourceWizardLogic) - const { onBack, onSubmit, closeWizard, cancelWizard } = useActions(sourceWizardLogic) + const { onBack, onSubmit, closeWizard } = useActions(sourceWizardLogic) const { currentStep, isLoading, canGoBack, canGoNext, nextButtonText, showSkipButton } = useValues(sourceWizardLogic) @@ -65,17 +63,17 @@ export function NewSourceWizard(): JSX.Element { ) }, [currentStep, isLoading, canGoNext, canGoBack, nextButtonText, showSkipButton]) - const onCancel = (): void => { - cancelWizard() - router.actions.push(urls.dataWarehouse()) - } - return ( <> - + Cancel diff --git a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx index 7aa8813f57450..f41a90e086fb7 100644 --- a/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx +++ b/frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx @@ -424,6 +424,7 @@ export const sourceWizardLogic = kea([ } }, closeWizard: () => { + actions.onClear() actions.clearSource() actions.loadSources(null) router.actions.push(urls.dataWarehouseSettings()) From 0acf4e2f7a7d11941a4ad5bd94c05b3417cc5d2f Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 15:13:08 -0400 Subject: [PATCH 23/26] add failsafe --- posthog/warehouse/api/external_data_source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 2577ae88e4233..740bc1229680a 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -391,10 +391,14 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): try: trigger_external_data_workflow(schema) except temporalio.service.RPCError as e: - # schedule doesn't exist + # if schema schedule doesn't exist if e.message == "sql: no rows in result set": sync_external_data_job_workflow(schema, create=True) + # source schedule doesn't exist + if e.message == "sql: no rows in result set": + sync_external_data_job_workflow(instance, create=True) + except Exception as e: logger.exception("Could not trigger external data job", exc_info=e) raise From 9df6108dc0d4c4a4baa6da48968929607fb826cb Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 15:46:22 -0400 Subject: [PATCH 24/26] update reload logic --- posthog/warehouse/api/external_data_source.py | 26 +++++++------------ posthog/warehouse/data_load/service.py | 5 ++++ 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index 740bc1229680a..d0612729a116c 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -17,6 +17,7 @@ cancel_external_data_workflow, delete_data_import_folder, is_any_external_data_job_paused, + trigger_external_data_source_workflow, ) from posthog.warehouse.models import ExternalDataSource, ExternalDataSchema, ExternalDataJob from posthog.warehouse.api.external_data_schema import ExternalDataSchemaSerializer @@ -380,24 +381,17 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): ) try: - trigger_external_data_workflow(instance) + trigger_external_data_source_workflow(instance) - except temporalio.service.RPCError as e: + except temporalio.service.RPCError: # if the source schedule has been removed - trigger the schema schedules - if e.message == "workflow execution already completed": - for schema in ExternalDataSchema.objects.filter( - team_id=self.team_id, source_id=instance.id, should_sync=True - ).all(): - try: - trigger_external_data_workflow(schema) - except temporalio.service.RPCError as e: - # if schema schedule doesn't exist - if e.message == "sql: no rows in result set": - sync_external_data_job_workflow(schema, create=True) - - # source schedule doesn't exist - if e.message == "sql: no rows in result set": - sync_external_data_job_workflow(instance, create=True) + for schema in ExternalDataSchema.objects.filter( + team_id=self.team_id, source_id=instance.id, should_sync=True + ).all(): + try: + trigger_external_data_workflow(schema) + except Exception as e: + logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e) except Exception as e: logger.exception("Could not trigger external data job", exc_info=e) diff --git a/posthog/warehouse/data_load/service.py b/posthog/warehouse/data_load/service.py index fc76018734597..31fc1aab6baff 100644 --- a/posthog/warehouse/data_load/service.py +++ b/posthog/warehouse/data_load/service.py @@ -95,6 +95,11 @@ async def a_sync_external_data_job_workflow( return external_data_schema +def trigger_external_data_source_workflow(external_data_source: ExternalDataSource): + temporal = sync_connect() + trigger_schedule(temporal, schedule_id=str(external_data_source.id)) + + def trigger_external_data_workflow(external_data_schema: ExternalDataSchema): temporal = sync_connect() trigger_schedule(temporal, schedule_id=str(external_data_schema.id)) From 9474a2945b7e4b35c0d55f86fa70b6b02e3f4e34 Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 16:00:21 -0400 Subject: [PATCH 25/26] create new schemas if triggered between reloads --- posthog/warehouse/api/external_data_source.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/posthog/warehouse/api/external_data_source.py b/posthog/warehouse/api/external_data_source.py index d0612729a116c..744797bd430b4 100644 --- a/posthog/warehouse/api/external_data_source.py +++ b/posthog/warehouse/api/external_data_source.py @@ -390,6 +390,11 @@ def reload(self, request: Request, *args: Any, **kwargs: Any): ).all(): try: trigger_external_data_workflow(schema) + except temporalio.service.RPCError as e: + # schedule doesn't exist + if e.message == "sql: no rows in result set": + sync_external_data_job_workflow(schema, create=True) + except Exception as e: logger.exception(f"Could not trigger external data job for schema {schema.name}", exc_info=e) From 9c11a8c7d7cfe393ff2957a9b9bc0dbf00603304 Mon Sep 17 00:00:00 2001 From: eric Date: Wed, 17 Apr 2024 16:08:27 -0400 Subject: [PATCH 26/26] add schema off check --- posthog/temporal/data_imports/external_data_job.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 62a47092d81bd..938ab423a0cbe 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -32,6 +32,7 @@ ExternalDataJob, get_active_schemas_for_source_id, ExternalDataSource, + aget_schema_by_id, ) from posthog.temporal.common.logger import bind_temporal_worker_logger from typing import Dict @@ -119,6 +120,12 @@ async def check_schedule_activity(inputs: ExternalDataWorkflowInputs) -> bool: logger.info(f"Deleted schedule for source {inputs.external_data_source_id}") return True + schema_model = await aget_schema_by_id(inputs.external_data_schema_id, inputs.team_id) + + # schema turned off so don't sync + if schema_model and not schema_model.should_sync: + return True + logger.info("Schema ID is set. Continuing...") return False