diff --git a/posthog/temporal/data_imports/external_data_job.py b/posthog/temporal/data_imports/external_data_job.py index cdb218c0cce31..4aecd3632ab28 100644 --- a/posthog/temporal/data_imports/external_data_job.py +++ b/posthog/temporal/data_imports/external_data_job.py @@ -11,7 +11,10 @@ from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table -from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING +from posthog.temporal.data_imports.pipelines.schemas import ( + PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, + PIPELINE_TYPE_SCHEMA_MAPPING, +) from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs from posthog.warehouse.external_data_source.jobs import ( create_external_data_job, @@ -51,7 +54,8 @@ async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) -> # Sync schemas if they have changed await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore - list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]), # type: ignore + all_schemas=list(PIPELINE_TYPE_SCHEMA_MAPPING[source.source_type]), + default_schemas=list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]), source_id=inputs.external_data_source_id, team_id=inputs.team_id, ) diff --git a/posthog/temporal/data_imports/pipelines/schemas.py b/posthog/temporal/data_imports/pipelines/schemas.py index a62db7d664e40..a9af34f3c5a12 100644 --- a/posthog/temporal/data_imports/pipelines/schemas.py +++ b/posthog/temporal/data_imports/pipelines/schemas.py @@ -1,4 +1,5 @@ from posthog.warehouse.models import ExternalDataSource -from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS +from posthog.temporal.data_imports.pipelines.stripe.settings import ALL_ENDPOINTS, DEFAULT_ENDPOINTS -PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS} +PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: DEFAULT_ENDPOINTS} +PIPELINE_TYPE_SCHEMA_MAPPING = {ExternalDataSource.Type.STRIPE: ALL_ENDPOINTS} diff --git a/posthog/temporal/data_imports/pipelines/stripe/settings.py b/posthog/temporal/data_imports/pipelines/stripe/settings.py index 70de092e09412..5e4cbe8268997 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/settings.py +++ b/posthog/temporal/data_imports/pipelines/stripe/settings.py @@ -4,3 +4,49 @@ # Full list of the Stripe API endpoints you can find here: https://stripe.com/docs/api. # These endpoints are converted into ExternalDataSchema objects when a source is linked. ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge") + +DEFAULT_ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge") + +ALL_ENDPOINTS = ( + "Account", + "ApplicationFee", + "BalanceTransaction", + "Charge", + "CountrySpec", + "Coupon", + "CreditNote", + "CreditNoteLineItem", + "Customer", + "CustomerBalanceTransaction", + "Dispute", + "Event", + "ExchangeRate", + "File", + "FileLink", + "FileUpload", + "Invoice", + "InvoiceItem", + "PaymentIntent", + "PaymentLink", + "PaymentMethod", + "PaymentMethodConfiguration", + "PaymentMethodDomain", + "Payout", + "Plan", + "Price", + "Product", + "PromotionCode", + "Quote", + "Refund", + "Review", + "SetupAttempt", + "SetupIntent", + "ShippingRate", + "Subscription", + "SubscriptionItem", + "SubscriptionSchedule", + "TaxCode", + "TaxRate", + "Topup", + "Transfer", +) diff --git a/posthog/temporal/tests/test_external_data_job.py b/posthog/temporal/tests/test_external_data_job.py index 1af196f368831..edd0f6b79197c 100644 --- a/posthog/temporal/tests/test_external_data_job.py +++ b/posthog/temporal/tests/test_external_data_job.py @@ -22,6 +22,7 @@ ) from posthog.warehouse.models import ( get_latest_run_if_exists, + get_all_schemas_for_source_id, DataWarehouseTable, ExternalDataJob, ExternalDataSource, @@ -30,6 +31,7 @@ from posthog.temporal.data_imports.pipelines.schemas import ( PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING, + PIPELINE_TYPE_SCHEMA_MAPPING, ) from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline from temporalio.testing import WorkflowEnvironment @@ -112,6 +114,9 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs assert await sync_to_async(runs.exists)() # type:ignore assert len(schemas) == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type]) + all_schemas = await sync_to_async(get_all_schemas_for_source_id)(source_id=new_source.pk, team_id=team.pk) # type: ignore + assert len(all_schemas) == len(PIPELINE_TYPE_SCHEMA_MAPPING[new_source.source_type]) + @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index 861f73f1c420a..f21f1ce462a64 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -38,9 +38,10 @@ def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int): return [val["name"] for val in schemas] -def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int): +def sync_old_schemas_with_new_schemas(all_schemas: list, default_schemas: list, source_id: uuid.UUID, team_id: int): old_schemas = get_all_schemas_for_source_id(source_id=source_id, team_id=team_id) - schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas] + schemas_to_create = [schema for schema in all_schemas if schema not in old_schemas] for schema in schemas_to_create: - ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id) + should_sync = schema in default_schemas + ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=should_sync)