From b110a4316de264207e5482634eb5e43538561524 Mon Sep 17 00:00:00 2001 From: Eric Duong Date: Tue, 1 Oct 2024 14:41:23 -0400 Subject: [PATCH] fix(data-warehouse): add missing config for salesforce incremental (#25322) --- .../pipelines/salesforce/__init__.py | 81 ++++++++++++++----- 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py index 36251d1dd5875..cd206b6adcd4f 100644 --- a/posthog/temporal/data_imports/pipelines/salesforce/__init__.py +++ b/posthog/temporal/data_imports/pipelines/salesforce/__init__.py @@ -16,8 +16,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "User": { "name": "User", "table_name": "user", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -37,8 +42,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "UserRole": { "name": "UserRole", "table_name": "user_role", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -58,8 +68,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Lead": { "name": "Lead", "table_name": "lead", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -79,8 +94,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Contact": { "name": "Contact", "table_name": "contact", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -100,8 +120,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Campaign": { "name": "Campaign", "table_name": "campaign", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -121,8 +146,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Product2": { "name": "Product2", "table_name": "product2", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -142,8 +172,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Pricebook2": { "name": "Pricebook2", "table_name": "pricebook2", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -163,8 +198,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "PricebookEntry": { "name": "PricebookEntry", "table_name": "pricebook_entry", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query", @@ -184,8 +224,13 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "Order": { "name": "Order", "table_name": "order", - **({"primary_key": "id"} if is_incremental else {}), - "write_disposition": "replace", + **({"primary_key": "Id"} if is_incremental else {}), + "write_disposition": { + "disposition": "merge", + "strategy": "upsert", + } + if is_incremental + else "replace", "endpoint": { "data_selector": "records", "path": "/services/data/v61.0/query",