diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index b376d4d2da8db..3939c264ea7e3 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -100,5 +100,6 @@ def sql_database( table_rows, name=table.name, primary_key=get_primary_key(table), + write_disposition="replace", spec=SqlDatabaseTableConfiguration, )(engine, table) diff --git a/posthog/temporal/data_imports/pipelines/stripe/__init__.py b/posthog/temporal/data_imports/pipelines/stripe/__init__.py index 0015480781b5e..442da6a22cf31 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/__init__.py +++ b/posthog/temporal/data_imports/pipelines/stripe/__init__.py @@ -12,7 +12,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "BalanceTransaction", "table_name": "balance_transaction", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_balancetransaction"), # type: ignore "endpoint": { "data_selector": "data", @@ -41,7 +41,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Charge", "table_name": "charge", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_charge"), # type: ignore "endpoint": { "data_selector": "data", @@ -69,7 +69,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Customer", "table_name": "customer", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_customer"), # type: ignore "endpoint": { "data_selector": "data", @@ -96,7 +96,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Invoice", "table_name": "invoice", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_invoice"), # type: ignore "endpoint": { "data_selector": "data", @@ -126,7 +126,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Price", "table_name": "price", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_price"), # type: ignore "endpoint": { "data_selector": "data", @@ -157,7 +157,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Product", "table_name": "product", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_product"), # type: ignore "endpoint": { "data_selector": "data", @@ -186,7 +186,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "Subscription", "table_name": "subscription", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("stripe_subscription"), # type: ignore "endpoint": { "data_selector": "data", @@ -263,7 +263,7 @@ def stripe_source( }, "resource_defaults": { "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", }, "resources": [get_resource(endpoint, is_incremental)], } diff --git a/posthog/temporal/data_imports/pipelines/zendesk/__init__.py b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py index 47b47f546f367..fa5f01e87aa61 100644 --- a/posthog/temporal/data_imports/pipelines/zendesk/__init__.py +++ b/posthog/temporal/data_imports/pipelines/zendesk/__init__.py @@ -12,7 +12,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "brands", "table_name": "brands", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_brands"), # type: ignore "endpoint": { "data_selector": "brands", @@ -30,7 +30,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "organizations", "table_name": "organizations", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_organizations"), # type: ignore "endpoint": { "data_selector": "organizations", @@ -48,7 +48,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "groups", "table_name": "groups", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_groups"), # type: ignore "endpoint": { "data_selector": "groups", @@ -68,7 +68,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "sla_policies", "table_name": "sla_policies", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_sla_policies"), # type: ignore "endpoint": { "data_selector": "sla_policies", @@ -83,7 +83,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "users", "table_name": "users", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_users"), # type: ignore "endpoint": { "data_selector": "users", @@ -106,7 +106,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "ticket_fields", "table_name": "ticket_fields", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_ticket_fields"), # type: ignore "endpoint": { "data_selector": "ticket_fields", @@ -127,7 +127,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "ticket_events", "table_name": "ticket_events", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_ticket_events"), # type: ignore "endpoint": { "data_selector": "ticket_events", @@ -150,7 +150,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "tickets", "table_name": "tickets", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_tickets"), # type: ignore "endpoint": { "data_selector": "tickets", @@ -172,7 +172,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "name": "ticket_metric_events", "table_name": "ticket_metric_events", "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", "columns": get_dlt_mapping_for_external_table("zendesk_ticket_metric_events"), # type: ignore "endpoint": { "data_selector": "ticket_metric_events", @@ -263,7 +263,7 @@ def zendesk_source( }, "resource_defaults": { "primary_key": "id", - "write_disposition": "merge", + "write_disposition": "merge" if is_incremental else "replace", }, "resources": [get_resource(endpoint, is_incremental)], } diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index 0b40c41f11069..e9504686d560d 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -33,10 +33,7 @@ class Status(models.TextChoices): @property def folder_path(self) -> str: - if self.schema and self.schema.is_incremental: - return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema.pk)}".lower().replace("-", "_") - - return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.pk)}".lower().replace("-", "_") + return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_") def url_pattern_by_schema(self, schema: str) -> str: if TEST: