Skip to content

Commit

Permalink
chore(data-warehouse): Allow syncing to a single S3 folder for the li…
Browse files Browse the repository at this point in the history
…fe of a schema (#23155)

Allow syncing to a single S3 folder for the life of a schema
  • Loading branch information
Gilbert09 authored Jun 25, 2024
1 parent 9b3942a commit 48e0672
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 22 deletions.
16 changes: 8 additions & 8 deletions posthog/temporal/data_imports/pipelines/stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "BalanceTransaction",
"table_name": "balance_transaction",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_balancetransaction"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -41,7 +41,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Charge",
"table_name": "charge",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_charge"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -69,7 +69,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Customer",
"table_name": "customer",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_customer"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand All @@ -96,7 +96,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Invoice",
"table_name": "invoice",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_invoice"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -126,7 +126,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Price",
"table_name": "price",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_price"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -157,7 +157,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Product",
"table_name": "product",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_product"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -186,7 +186,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "Subscription",
"table_name": "subscription",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("stripe_subscription"), # type: ignore
"endpoint": {
"data_selector": "data",
Expand Down Expand Up @@ -263,7 +263,7 @@ def stripe_source(
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
},
"resources": [get_resource(endpoint, is_incremental)],
}
Expand Down
20 changes: 10 additions & 10 deletions posthog/temporal/data_imports/pipelines/zendesk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "brands",
"table_name": "brands",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_brands"), # type: ignore
"endpoint": {
"data_selector": "brands",
Expand All @@ -30,7 +30,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "organizations",
"table_name": "organizations",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_organizations"), # type: ignore
"endpoint": {
"data_selector": "organizations",
Expand All @@ -48,7 +48,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "groups",
"table_name": "groups",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_groups"), # type: ignore
"endpoint": {
"data_selector": "groups",
Expand All @@ -68,7 +68,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "sla_policies",
"table_name": "sla_policies",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_sla_policies"), # type: ignore
"endpoint": {
"data_selector": "sla_policies",
Expand All @@ -83,7 +83,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "users",
"table_name": "users",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_users"), # type: ignore
"endpoint": {
"data_selector": "users",
Expand All @@ -106,7 +106,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "ticket_fields",
"table_name": "ticket_fields",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_fields"), # type: ignore
"endpoint": {
"data_selector": "ticket_fields",
Expand All @@ -127,7 +127,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "ticket_events",
"table_name": "ticket_events",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_events"), # type: ignore
"endpoint": {
"data_selector": "ticket_events",
Expand All @@ -150,7 +150,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "tickets",
"table_name": "tickets",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_tickets"), # type: ignore
"endpoint": {
"data_selector": "tickets",
Expand All @@ -172,7 +172,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"name": "ticket_metric_events",
"table_name": "ticket_metric_events",
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
"columns": get_dlt_mapping_for_external_table("zendesk_ticket_metric_events"), # type: ignore
"endpoint": {
"data_selector": "ticket_metric_events",
Expand Down Expand Up @@ -263,7 +263,7 @@ def zendesk_source(
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge",
"write_disposition": "merge" if is_incremental else "replace",
},
"resources": [get_resource(endpoint, is_incremental)],
}
Expand Down
5 changes: 1 addition & 4 deletions posthog/warehouse/models/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ class Status(models.TextChoices):
__repr__ = sane_repr("id")

def folder_path(self) -> str:
if self.schema and self.schema.is_incremental:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_")

return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.pk)}".lower().replace("-", "_")
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_")

def url_pattern_by_schema(self, schema: str) -> str:
if TEST:
Expand Down

0 comments on commit 48e0672

Please sign in to comment.