Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(data-warehouse): enable more schemas #19515

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
from posthog.temporal.batch_exports.base import PostHogWorkflow

from posthog.warehouse.data_load.validate_schema import validate_schema_and_update_table
from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING
from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
PIPELINE_TYPE_SCHEMA_MAPPING,
)
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline, PipelineInputs
from posthog.warehouse.external_data_source.jobs import (
create_external_data_job,
Expand Down Expand Up @@ -51,7 +54,8 @@ async def create_external_data_job_model(inputs: CreateExternalDataJobInputs) ->

# Sync schemas if they have changed
await sync_to_async(sync_old_schemas_with_new_schemas)( # type: ignore
list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]), # type: ignore
all_schemas=list(PIPELINE_TYPE_SCHEMA_MAPPING[source.source_type]),
default_schemas=list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[source.source_type]),
source_id=inputs.external_data_source_id,
team_id=inputs.team_id,
)
Expand Down
5 changes: 3 additions & 2 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from posthog.warehouse.models import ExternalDataSource
from posthog.temporal.data_imports.pipelines.stripe.settings import ENDPOINTS
from posthog.temporal.data_imports.pipelines.stripe.settings import ALL_ENDPOINTS, DEFAULT_ENDPOINTS

PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: ENDPOINTS}
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING = {ExternalDataSource.Type.STRIPE: DEFAULT_ENDPOINTS}
PIPELINE_TYPE_SCHEMA_MAPPING = {ExternalDataSource.Type.STRIPE: ALL_ENDPOINTS}
46 changes: 46 additions & 0 deletions posthog/temporal/data_imports/pipelines/stripe/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,49 @@
# Full list of the Stripe API endpoints you can find here: https://stripe.com/docs/api.
# These endpoints are converted into ExternalDataSchema objects when a source is linked.
ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge")

DEFAULT_ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge")

ALL_ENDPOINTS = (
"Account",
"ApplicationFee",
"BalanceTransaction",
"Charge",
"CountrySpec",
"Coupon",
"CreditNote",
"CreditNoteLineItem",
"Customer",
"CustomerBalanceTransaction",
"Dispute",
"Event",
"ExchangeRate",
"File",
"FileLink",
"FileUpload",
"Invoice",
"InvoiceItem",
"PaymentIntent",
"PaymentLink",
"PaymentMethod",
"PaymentMethodConfiguration",
"PaymentMethodDomain",
"Payout",
"Plan",
"Price",
"Product",
"PromotionCode",
"Quote",
"Refund",
"Review",
"SetupAttempt",
"SetupIntent",
"ShippingRate",
"Subscription",
"SubscriptionItem",
"SubscriptionSchedule",
"TaxCode",
"TaxRate",
"Topup",
"Transfer",
)
5 changes: 5 additions & 0 deletions posthog/temporal/tests/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from posthog.warehouse.models import (
get_latest_run_if_exists,
get_all_schemas_for_source_id,
DataWarehouseTable,
ExternalDataJob,
ExternalDataSource,
Expand All @@ -30,6 +31,7 @@

from posthog.temporal.data_imports.pipelines.schemas import (
PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING,
PIPELINE_TYPE_SCHEMA_MAPPING,
)
from posthog.temporal.data_imports.pipelines.pipeline import DataImportPipeline
from temporalio.testing import WorkflowEnvironment
Expand Down Expand Up @@ -112,6 +114,9 @@ async def test_create_external_job_activity(activity_environment, team, **kwargs
assert await sync_to_async(runs.exists)() # type:ignore
assert len(schemas) == len(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING[new_source.source_type])

all_schemas = await sync_to_async(get_all_schemas_for_source_id)(source_id=new_source.pk, team_id=team.pk) # type: ignore
assert len(all_schemas) == len(PIPELINE_TYPE_SCHEMA_MAPPING[new_source.source_type])


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
Expand Down
7 changes: 4 additions & 3 deletions posthog/warehouse/models/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ def get_all_schemas_for_source_id(source_id: uuid.UUID, team_id: int):
return [val["name"] for val in schemas]


def sync_old_schemas_with_new_schemas(new_schemas: list, source_id: uuid.UUID, team_id: int):
def sync_old_schemas_with_new_schemas(all_schemas: list, default_schemas: list, source_id: uuid.UUID, team_id: int):
old_schemas = get_all_schemas_for_source_id(source_id=source_id, team_id=team_id)
schemas_to_create = [schema for schema in new_schemas if schema not in old_schemas]
schemas_to_create = [schema for schema in all_schemas if schema not in old_schemas]

for schema in schemas_to_create:
ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id)
should_sync = schema in default_schemas
ExternalDataSchema.objects.create(name=schema, team_id=team_id, source_id=source_id, should_sync=should_sync)
Loading