diff --git a/posthog/temporal/data_imports/__init__.py b/posthog/temporal/data_imports/__init__.py index 35e20f0ffc50e..c6a142c712d39 100644 --- a/posthog/temporal/data_imports/__init__.py +++ b/posthog/temporal/data_imports/__init__.py @@ -4,6 +4,7 @@ update_external_data_job_model, run_external_data_job, validate_schema_activity, + create_source_templates, ) WORKFLOWS = [ExternalDataJobWorkflow] @@ -13,4 +14,5 @@ update_external_data_job_model, run_external_data_job, validate_schema_activity, + create_source_templates, ] diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index 3288d9a313c9d..db99eeb1de315 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -10,6 +10,7 @@ # TODO: remove dependency from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs @@ -125,6 +126,17 @@ async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None: ) +@dataclasses.dataclass +class CreateSourceTemplateInputs: + team_id: int + run_id: str + + +@activity.defn +async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None: + await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id) + + @dataclasses.dataclass class ExternalDataWorkflowInputs: team_id: int @@ -291,6 +303,14 @@ async def run(self, inputs: ExternalDataWorkflowInputs): retry_policy=RetryPolicy(maximum_attempts=2), ) + # Create source templates + await workflow.execute_activity( + create_source_templates, + CreateSourceTemplateInputs(team_id=inputs.team_id, run_id=run_id), + start_to_close_timeout=dt.timedelta(minutes=10), + retry_policy=RetryPolicy(maximum_attempts=2), + ) + except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): update_inputs.status = ExternalDataJob.Status.CANCELLED diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index 9f0ca2d9a0d32..45de2c687cb7d 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -11,6 +11,7 @@ ValidateSchemaInputs, create_external_data_job, create_external_data_job_model, + create_source_templates, run_external_data_job, update_external_data_job_model, validate_schema_activity, @@ -781,6 +782,7 @@ async def test_external_data_job_workflow_blank(team, **kwargs): update_external_data_job_model, run_external_data_job, validate_schema_activity, + create_source_templates, ], workflow_runner=UnsandboxedWorkflowRunner(), ): @@ -844,6 +846,7 @@ async def mock_async_func(inputs): update_external_data_job_model, run_external_data_job, validate_schema_activity, + create_source_templates, ], workflow_runner=UnsandboxedWorkflowRunner(), ): diff --git a/posthog/warehouse/data_load/source_templates.py b/posthog/warehouse/data_load/source_templates.py new file mode 100644 index 0000000000000..afc61cccd1be3 --- /dev/null +++ b/posthog/warehouse/data_load/source_templates.py @@ -0,0 +1,70 @@ +from posthog.temporal.common.logger import bind_temporal_worker_logger +from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job, get_latest_run_if_exists +from posthog.warehouse.models.external_data_source import ExternalDataSource +from posthog.warehouse.models.join import DataWarehouseJoin +from posthog.warehouse.util import database_sync_to_async + + +@database_sync_to_async +def database_operations(team_id: int, table_prefix: str) -> None: + customer_join_exists = DataWarehouseJoin.objects.filter( + team_id=team_id, + source_table_name="persons", + source_table_key="properties.email", + joining_table_name=f"{table_prefix}stripe_customer", + joining_table_key="email", + field_name="stripe_customer", + ).exists() + + invoice_join_exists = DataWarehouseJoin.objects.filter( + team_id=team_id, + source_table_name="persons", + source_table_key="properties.email", + joining_table_name=f"{table_prefix}stripe_invoice", + joining_table_key="customer_email", + field_name="stripe_invoice", + ).exists() + + if not customer_join_exists: + DataWarehouseJoin.objects.create( + team_id=team_id, + source_table_name="persons", + source_table_key="properties.email", + joining_table_name=f"{table_prefix}stripe_customer", + joining_table_key="email", + field_name="stripe_customer", + ) + + if not invoice_join_exists: + DataWarehouseJoin.objects.create( + team_id=team_id, + source_table_name="persons", + source_table_key="properties.email", + joining_table_name=f"{table_prefix}stripe_invoice", + joining_table_key="customer_email", + field_name="stripe_invoice", + ) + + +async def create_warehouse_templates_for_source(team_id: int, run_id: str) -> None: + logger = await bind_temporal_worker_logger(team_id=team_id) + + job: ExternalDataJob = await get_external_data_job(job_id=run_id) + last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(job.team_id, job.pipeline_id) + + source: ExternalDataSource.Type = job.pipeline.source_type + + # Quick exit if this isn't the first sync, or a stripe source + if source != ExternalDataSource.Type.STRIPE or last_successful_job is not None: + logger.info( + f"Create warehouse templates skipped for job {run_id}", + ) + return + + table_prefix = job.pipeline.prefix or "" + + await database_operations(team_id, table_prefix) + + logger.info( + f"Created warehouse template for job {run_id}", + )