Skip to content

Commit

Permalink
feat(data-warehouse): Added template creation at the end of connectin…
Browse files Browse the repository at this point in the history
…g Stripe source for th… (#20933)

* Added template creation at the end of connecting Stripe source for the first time

* Fixed tests

* add import

---------

Co-authored-by: eric <[email protected]>
Gilbert09 and EDsCODE authored Mar 18, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 2778c6b commit badc581
Showing 4 changed files with 95 additions and 0 deletions.
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
update_external_data_job_model,
run_external_data_job,
validate_schema_activity,
create_source_templates,
)

WORKFLOWS = [ExternalDataJobWorkflow]
@@ -13,4 +14,5 @@
update_external_data_job_model,
run_external_data_job,
validate_schema_activity,
create_source_templates,
]
20 changes: 20 additions & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@

# TODO: remove dependency
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.warehouse.data_load.source_templates import create_warehouse_templates_for_source

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
@@ -125,6 +126,17 @@ async def validate_schema_activity(inputs: ValidateSchemaInputs) -> None:
)


@dataclasses.dataclass
class CreateSourceTemplateInputs:
team_id: int
run_id: str


@activity.defn
async def create_source_templates(inputs: CreateSourceTemplateInputs) -> None:
await create_warehouse_templates_for_source(team_id=inputs.team_id, run_id=inputs.run_id)


@dataclasses.dataclass
class ExternalDataWorkflowInputs:
team_id: int
@@ -291,6 +303,14 @@ async def run(self, inputs: ExternalDataWorkflowInputs):
retry_policy=RetryPolicy(maximum_attempts=2),
)

# Create source templates
await workflow.execute_activity(
create_source_templates,
CreateSourceTemplateInputs(team_id=inputs.team_id, run_id=run_id),
start_to_close_timeout=dt.timedelta(minutes=10),
retry_policy=RetryPolicy(maximum_attempts=2),
)

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
update_inputs.status = ExternalDataJob.Status.CANCELLED
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
ValidateSchemaInputs,
create_external_data_job,
create_external_data_job_model,
create_source_templates,
run_external_data_job,
update_external_data_job_model,
validate_schema_activity,
@@ -781,6 +782,7 @@ async def test_external_data_job_workflow_blank(team, **kwargs):
update_external_data_job_model,
run_external_data_job,
validate_schema_activity,
create_source_templates,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
@@ -844,6 +846,7 @@ async def mock_async_func(inputs):
update_external_data_job_model,
run_external_data_job,
validate_schema_activity,
create_source_templates,
],
workflow_runner=UnsandboxedWorkflowRunner(),
):
70 changes: 70 additions & 0 deletions posthog/warehouse/data_load/source_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.external_data_job import ExternalDataJob, get_external_data_job, get_latest_run_if_exists
from posthog.warehouse.models.external_data_source import ExternalDataSource
from posthog.warehouse.models.join import DataWarehouseJoin
from posthog.warehouse.util import database_sync_to_async


@database_sync_to_async
def database_operations(team_id: int, table_prefix: str) -> None:
customer_join_exists = DataWarehouseJoin.objects.filter(
team_id=team_id,
source_table_name="persons",
source_table_key="properties.email",
joining_table_name=f"{table_prefix}stripe_customer",
joining_table_key="email",
field_name="stripe_customer",
).exists()

invoice_join_exists = DataWarehouseJoin.objects.filter(
team_id=team_id,
source_table_name="persons",
source_table_key="properties.email",
joining_table_name=f"{table_prefix}stripe_invoice",
joining_table_key="customer_email",
field_name="stripe_invoice",
).exists()

if not customer_join_exists:
DataWarehouseJoin.objects.create(
team_id=team_id,
source_table_name="persons",
source_table_key="properties.email",
joining_table_name=f"{table_prefix}stripe_customer",
joining_table_key="email",
field_name="stripe_customer",
)

if not invoice_join_exists:
DataWarehouseJoin.objects.create(
team_id=team_id,
source_table_name="persons",
source_table_key="properties.email",
joining_table_name=f"{table_prefix}stripe_invoice",
joining_table_key="customer_email",
field_name="stripe_invoice",
)


async def create_warehouse_templates_for_source(team_id: int, run_id: str) -> None:
logger = await bind_temporal_worker_logger(team_id=team_id)

job: ExternalDataJob = await get_external_data_job(job_id=run_id)
last_successful_job: ExternalDataJob | None = await get_latest_run_if_exists(job.team_id, job.pipeline_id)

source: ExternalDataSource.Type = job.pipeline.source_type

# Quick exit if this isn't the first sync, or a stripe source
if source != ExternalDataSource.Type.STRIPE or last_successful_job is not None:
logger.info(
f"Create warehouse templates skipped for job {run_id}",
)
return

table_prefix = job.pipeline.prefix or ""

await database_operations(team_id, table_prefix)

logger.info(
f"Created warehouse template for job {run_id}",
)

0 comments on commit badc581

Please sign in to comment.