diff --git a/airbyte-integrations/connectors/source-klaviyo/setup.py b/airbyte-integrations/connectors/source-klaviyo/setup.py index 0c71eecd134e..e2abf00cc000 100644 --- a/airbyte-integrations/connectors/source-klaviyo/setup.py +++ b/airbyte-integrations/connectors/source-klaviyo/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.67"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "connector-acceptance-test", "requests_mock~=1.8"] diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json index 6e494a5d9b42..8c1fa1e406b8 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json @@ -1,55 +1,140 @@ { "type": "object", "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": ["null", "string"], "format": "date-time" }, - "updated": { "type": ["null", "string"], "format": "date-time" }, - "status": { "type": "string" }, - "status_id": { "type": "integer" }, - "status_label": { "type": "string" }, - "from_name": { "type": "string" }, - "from_email": { "type": "string" }, - "num_recipients": { "type": "integer" }, - "lists": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } - } + "updated_at": { "type": ["null", "string"], "format": "date-time" }, + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "status": { "type": "string" }, + "archived": { "type": "boolean" }, + "channel": { "type": "string" }, + "audiences": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "included": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + }, + "excluded": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + } + } + }, + "send_options": { + "type": ["null", "object"], + "properties": { + "ignore_unsubscribes": { "type": ["null", "boolean"] }, + "use_smart_sending": { "type": ["null", "boolean"] } + } + }, + "message": { "type": "string" }, + "tracking_options": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "is_tracking_opens": { "type": ["null", "boolean"] }, + "is_tracking_clicks": { "type": ["null", "boolean"] }, + "is_add_utm": { "type": ["null", "boolean"] }, + "utm_params": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "properties": { + "name": { "type": "string" }, + "value": { "type": "string" } + } + } + } + } + }, + "send_strategy": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "method": { "type": "string" }, + "options_static": { + "type": ["null", "object"], + "properties": { + "datetime": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" + }, + "is_local": { "type": ["null", "boolean"] }, + "send_past_recipients_immediately": { + "type": ["null", "boolean"] + } + } + }, + "options_throttled": { + "type": ["null", "object"], + "properties": { + "datetime": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" + }, + "throttle_percentage": { "type": "integer" } + } + }, + "options_sto": { + "type": ["null", "object"], + "properties": { + "date": { "type": "string", "format": "date" } + } + } + } + }, + "created_at": { "type": ["null", "string"], "format": "date-time" }, + "scheduled_at": { "type": ["null", "string"], "format": "date-time" }, + "updated_at": { "type": ["null", "string"], "format": "date-time" }, + "send_time": { "type": ["null", "string"], "format": "date-time" } } }, - "excluded_lists": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } - } + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } } }, - "is_segmented": { "type": "boolean" }, - "send_time": { "type": ["null", "string"], "format": "date-time" }, - "sent_at": { "type": ["null", "string"], "format": "date-time" }, - "campaign_type": { "type": "string" }, - "subject": { "type": ["null", "string"] }, - "message_type": { "type": "string" }, - "template_id": { "type": ["null", "string"] } + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json index 09a5cba42c4b..1ecc20730a38 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json @@ -1,27 +1,79 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": true, "properties": { - "object": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "attributes": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "name": { "type": "string" - }, - "id": { + }, + "editor_type": { + "type": [ + "null", + "string" + ] + }, + "html": { "type": "string" - }, - "name": { - "type": ["null", "string"] - }, - "html": { - "type": ["null", "string"] - }, - "is_writeable": { - "type": ["null", "boolean"] - }, - "created": { - "type": "string", "format": "date-time" - }, - "updated": { - "type": "string", "format": "date-time" + }, + "text": { + "type": [ + "null", + "string" + ] + }, + "created": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "company_id": { + "type": [ + "null", + "string" + ] + } } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + } + } + } } -} + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json index 7cf830f697a8..908f8d92e2f5 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json @@ -1,42 +1,133 @@ { "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "uuid": { "type": "string" }, - "event_name": { "type": "string" }, - "timestamp": { "type": "integer" }, - "datetime": { "type": "string" }, - "statistic_id": { "type": "string" }, - "event_properties": { - "type": "object", + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + }, + "datetime": { + "type": "string", + "format": "date-time" + }, + "attributes": { + "type": [ + "null", + "object" + ], "properties": { - "$value": { "type": "number" }, - "items": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "name": { "type": "string" }, - "sku": { "type": "string" }, - "price": { "type": "number" }, - "quantity": { "type": "integer" } - } - } + "timestamp": { + "type": "integer" + }, + "event_properties": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "datetime": { + "type": "string", + "format": "date-time" + }, + "uuid": { + "type": "string" } } }, - "person": { - "type": "object", + "links": { + "type": [ + "null", + "object" + ], "properties": { - "id": { "type": "string" }, - "object": { "type": "string" }, - "$email": { "type": "string" } + "self": { + "type": "string" + } } }, - "flow_id": { "type": ["null", "string"] }, - "campaign_id": { "type": ["null", "string"] }, - "flow_message_id": { "type": ["null", "string"] } + "relationships": { + "type": [ + "null", + "object" + ], + "properties": { + "profile": { + "type": [ + "null", + "object" + ], + "properties": { + "data": { + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + }, + "related": { + "type": "string" + } + } + } + } + }, + "metric": { + "type": [ + "null", + "object" + ], + "properties": { + "data": { + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + }, + "related": { + "type": "string" + } + } + } + } + } + } + } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json index 54ea3dded05c..50ce2ba1727d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json @@ -1,13 +1,80 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "status": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, "updated": { "type": "string", "format": "date-time" }, - "customer_filter": { "type": ["null", "object"] }, - "trigger": { "type": "object" } + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "status": { "type": "string" }, + "archived": { "type": "boolean" }, + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "trigger_type": { "type": "string" } + } + }, + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "flow-actions": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + }, + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json index 84e21e29f70e..a7ca8550707d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json @@ -1,9 +1,32 @@ { "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, - "email": { "type": "string" }, - "reason": { "type": "string" }, - "timestamp": { "type": "string", "format": "date-time" } + "type": { "type": ["null", "string"] }, + "id": { "type": "string" }, + "updated": { "type": ["null", "string"], "format": "date-time" }, + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "email": { "type": ["null", "string"] }, + "phone_number": { "type": ["null", "string"] }, + "first_name": { "type": ["null", "string"] }, + "last_name": { "type": ["null", "string"] }, + "properties": { + "type": ["null", "object"], + "additionalProperties": true + }, + "subscriptions": { "type": ["null", "object"] }, + "organization": { "type": ["null", "string"] }, + "title": { "type": ["null", "string"] }, + "created": { "type": ["null", "string"], "format": "date-time" }, + "updated": { "type": ["null", "string"], "format": "date-time" }, + "last_event_date": { "type": ["null", "string"], "format": "date-time" } + } + }, + "links": { "type": ["null", "object"] }, + "relationships": { "type": ["null", "object"] }, + "segments": { "type": ["null", "object"] } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json index da721281b4e4..a8486090cc97 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json @@ -1,13 +1,71 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "opt_in_process": { "type": ["string", "null"] } + } + }, + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "profiles": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": { + "type" : "string" + }, + "related": { + "type" : "string" + } + } + } + } + }, + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json index bfdb31a0e45e..e2ceda51361a 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json @@ -1,19 +1,28 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "integration": { - "type": "object", + "attributes": { + "type": ["null", "object"], "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, "name": { "type": "string" }, - "category": { "type": "string" } + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "integration": { + "type": [ "null", "object"], + "additionalProperties": true + } + } + }, + "links": { + "type": [ "null", "object" ], + "additionalProperties": true, + "properties": { + "self": { "type": "string"} } } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json new file mode 100644 index 000000000000..7945c2cee6e9 --- /dev/null +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json @@ -0,0 +1,213 @@ +{ + "type": "object", + "additionalProperties": true, + "properties": + { + "type": + { + "type": + [ + "null", + "string" + ] + }, + "id": + { + "type": "string" + }, + "updated": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + }, + "attributes": + { + "additionalProperties": true, + "type": + [ + "null", + "object" + ], + "properties": + { + "name": + { + "type": "string" + }, + "additionalProperties": true, + "is_active": + { + "type": "boolean" + }, + "is_processing": + { + "type": "boolean" + }, + "is_started": + { + "type": "boolean" + }, + "created": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + } + } + }, + "links": + { + "type": + [ + "null", + "object" + ], + "additionalProperties": true, + "properties": + { + "self": + { + "type": "string" + } + } + }, + "relationships": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "profiles": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "links": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "self": + { + "type": "string" + }, + "related": + { + "type": "string" + } + } + } + } + }, + "tags": + { + "type": + [ + "null", + "object" + ], + "date": + { + "type": + { + "type": "string" + }, + "id": + { + "type": "string" + } + }, + "links": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "self": + { + "type": "string" + }, + "related": + { + "type": "string" + } + } + } + } + } + }, + "included": + { + "type": + { + "type": "string" + }, + "id": + { + "type": "string" + }, + "attributes": + { + "type": + [ + "null", + "object" + ], + "additionalProperties": true, + "properties": + { + "name": + { + "type": "string" + } + } + }, + "links": + { + "type": + [ + "null", + "objects" + ], + "properties": + { + "additionalProperties": true, + "self": + { + "type": "string" + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json new file mode 100644 index 000000000000..b774efbcf72b --- /dev/null +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json @@ -0,0 +1,153 @@ +{ + "type": "object", + "additionalProperties": true, + "properties": { + "type": {"type": ["null", "string"]}, + "id": {"type": "string"}, + "attributes": { + "email": {"type": ["null", "string"]}, + "phone_number": {"type": ["null", "string"]}, + "external_id" : {"type": ["null", "string"]}, + "first_name": {"type": ["null", "string"]}, + "last_name": {"type": ["null", "string"]}, + "organization": {"type": ["null", "string"]}, + "title": {"type": ["null", "string"]}, + "image": {"type": ["null", "string"]}, + "updated": {"type": ["null", "string"], "format": "date-time"}, + "created": {"type": ["null", "string"], "format": "date-time"}, + "last_event_date": {"type": ["null", "string"], "format": "date-time"}, + "joined_group_at": {"type": ["null", "string"], "format": "date-time"}, + "location": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "address1": {"type": ["null", "string"]}, + "address2": {"type": ["null", "string"]}, + "city": {"type": ["null", "string"]}, + "country": {"type": ["null", "string"]}, + "latitude": {"type": ["null", "string"]}, + "longitude": {"type": ["null", "string"]}, + "region": {"type": ["null", "string"]}, + "zip": {"type": ["null", "string"]}, + "timezone": {"type": ["null", "string"]}, + "ip": {"type": ["null", "string"]} + }, + "subscriptions": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "email": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "marketing": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "can_receive_email_marketing": {"type": "boolean"}, + "consent": {"type": ["null", "string"]}, + "consent_timestamp": {"type": ["null", "string"], "format": "date-time"}, + "last_updated": {"type": ["null", "string"], "format": "date-time"}, + "method": {"type": ["null", "string"]}, + "method_details": {"type": ["null", "string"]}, + "custom_method_detail": {"type": ["null", "string"]}, + "double_optin" : {"type": ["null", "string"]}, + "suppression": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "reason" : {"type" : ["null", "string"]}, + "timestamp" : {"type" : ["null", "string"], "format" : "date-time"} + } + } + }, + "list_suppressions": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "list_id" : {"type" : ["null", "string"]}, + "reason" : {"type" : ["null", "string"]}, + "timestamp" : {"type" : ["null", "string"], "format" : "date-time"} + } + } + } + } + } + } + }, + "sms": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "marketing": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "can_receive_sms_marketing": {"type": "boolean"}, + "consent": {"type": ["null", "string"]}, + "consent_timestamp": {"type": ["null", "string"], "format": "date-time"}, + "method": {"type": ["null", "string"]}, + "method_details": {"type": ["null", "string"]}, + "last_updated": {"type": ["null", "string"], "format": "date-time"} + } + } + } + } + } + }, + "predictive_analytics": { + "historic_clv": {"type": ["null", "number"]}, + "predicted_clv": {"type": ["null", "number"]}, + "total_clv": {"type": ["null", "number"]}, + "historic_number_of_orders": {"type": ["null", "number"]}, + "predicted_number_of_orders": {"type": ["null", "number"]}, + "average_days_between_orders": {"type": ["null", "number"]}, + "average_order_value": {"type": ["null", "number"]}, + "churn_probability": {"type": ["null", "number"]}, + "expected_date_of_next_order": {"type": ["null", "string"], "format": "date-time"} + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "lists": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": {"type" : "string"}, + "related": {"type" : "string"} + } + } + } + }, + "segments": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": {"type" : "string"}, + "related": {"type" : "string"} + } + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 4843cd213aa1..160e3470bb64 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -8,7 +8,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles +from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles, Segments, SegmentsProfiles class SourceKlaviyo(AbstractSource): @@ -41,12 +41,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: api_key = config["api_key"] start_date = config["start_date"] return [ - Campaigns(api_key=api_key), + Campaigns(api_key=api_key, start_date=start_date), Events(api_key=api_key, start_date=start_date), GlobalExclusions(api_key=api_key, start_date=start_date), - Lists(api_key=api_key), + Lists(api_key=api_key, start_date=start_date), Metrics(api_key=api_key), Flows(api_key=api_key, start_date=start_date), - EmailTemplates(api_key=api_key), + EmailTemplates(api_key=api_key, start_date=start_date), Profiles(api_key=api_key, start_date=start_date), + Segments(api_key=api_key, start_date=start_date), + SegmentsProfiles(api_key=api_key, start_date=start_date) ] diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index bb2e35e30abe..706bcc39b048 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -9,8 +9,8 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum -from datetime import timedelta import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -22,6 +22,7 @@ class KlaviyoStreamLatest(HttpStream, ABC): url_base = "https://a.klaviyo.com/api/" primary_key = "id" page_size = 100 + include = None def __init__(self, api_key: str, **kwargs): super().__init__(**kwargs) @@ -37,7 +38,7 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: headers = { "Accept": "application/json", "Content-Type": "application/json", - "Revision": "2023-02-22", + "Revision": "2024-05-15", "Authorization": "Klaviyo-API-Key " + self._api_key, } @@ -65,8 +66,12 @@ def request_params( # If next_page_token is set, all of the parameters are already provided if next_page_token: return next_page_token - else: - return {"page[size]": self.page_size} + params = {} + if self.page_size: + params["page[size]"] = self.page_size + if self.include: + params["include"] = self.include + return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """:return an iterable containing each record in the response""" @@ -87,6 +92,10 @@ def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) self._start_ts = start_date + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + @property @abstractmethod def cursor_field(self) -> Union[str, List[str]]: @@ -96,7 +105,12 @@ def cursor_field(self) -> Union[str, List[str]]: :return str: The name of the cursor field. """ - def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs): + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): """Add incremental filters""" stream_state = stream_state or {} @@ -122,17 +136,63 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.cursor_field: latest_cursor.isoformat()} +class IncrementalKlaviyoStreamLatestWithArchivedRecords(IncrementalKlaviyoStreamLatest, ABC): + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return [{"archived": value} for value in [False, True]] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + archived = latest_record.get("attributes", {}).get("archived", False) + if archived: + current_stream_cursor_value = current_stream_state.get("archived", {}).get(self.cursor_field, self._start_ts) + latest_record_cursor_value = latest_record[self.cursor_field] + latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) + current_stream_state["archived"] = {self.cursor_field: str(latest_cursor)} + return current_stream_state + else: + return super().get_updated_state(current_stream_state, latest_record) + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + current_stream_state = stream_state + if stream_state.get("archived"): + current_stream_state = stream_state.get("archived", {}) + params = super().request_params(current_stream_state, stream_slice, next_page_token) + archived = stream_slice.get("archived", False) + if archived: + archived_filter = "equals(archived,true)" + if "filter" in params and archived_filter not in params["filter"]: + params["filter"] = f"and({params['filter']},{archived_filter})" + elif "filter" not in params: + params["filter"] = archived_filter + return params + + class Profiles(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get_profiles""" cursor_field = "updated" + page_size = 100 def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "profiles" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + """Add incremental filters""" + params = super().request_params(stream_state, stream_slice, next_page_token) + params["additional-fields[profile]"] = "predictive_analytics,subscriptions" + return params class KlaviyoStreamV1(HttpStream, ABC): @@ -275,7 +335,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, decoded_response = response.json() if decoded_response.get("next"): return {"since": decoded_response["next"]} - + data = decoded_response.get("data", [{}]) or [{}] self.logger.info("Last timestamp -> " + str(data[-1].get("timestamp", "No timestamp"))) @@ -357,35 +417,105 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield record -class Campaigns(KlaviyoStreamV1): +class Campaigns(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-campaigns""" - def path(self, **kwargs) -> str: + cursor_field = "updated_at" + page_size = None + current_channel = None + + def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "campaigns" + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + archived_flags = [False, True] + message_channels = ["email", "sms"] + return [{"archived": flag, "channel": channel} for flag in archived_flags for channel in message_channels] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + archived = latest_record.get("attributes", {}).get("archived", False) + updated_state = super().get_updated_state(current_stream_state, latest_record) + if archived: + current_stream_cursor_value = current_stream_state.get("archived", {}).get(self.cursor_field, self._start_ts) + latest_record_cursor_value = latest_record[self.cursor_field] + latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) + current_stream_state["archived"] = {self.current_channel: {self.cursor_field: latest_cursor}} + else: + current_stream_state[self.current_channel] = updated_state + return current_stream_state + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + archived = stream_slice.get("archived", False) + channel = stream_slice.get("channel", False) + if archived: + current_stream_state = stream_state.get("archived", {}).get(channel, {}) + else: + current_stream_state = stream_state.get(channel, {}) + params = super().request_params(current_stream_state, stream_slice, next_page_token) + + self.current_channel = channel + channel_filter = f"equals(messages.channel,'{channel}')" + if "filter" in params and channel_filter not in params["filter"]: + params["filter"] = f"{params['filter']},{channel_filter}" + elif "filter" not in params: + params["filter"] = channel_filter + + if archived: + archived_filter = "equals(archived,true)" + if "filter" in params and archived_filter not in params["filter"]: + params["filter"] = f"{params['filter']},{archived_filter}" + elif "filter" not in params: + params["filter"] = archived_filter + return params + -class Lists(KlaviyoStreamV1): +class Lists(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-lists""" + cursor_field = "updated" max_retries = 10 + page_size = None + include = "tags" def path(self, **kwargs) -> str: return "lists" -class GlobalExclusions(ReverseIncrementalKlaviyoStreamV1): +class GlobalExclusions(Profiles): """Docs: https://developers.klaviyo.com/en/reference/get-global-exclusions""" + suppression_fields = ["attributes", "subscriptions", "email", "marketing", "suppression"] - page_size = 5000 # the maximum value allowed by API - cursor_field = "timestamp" - primary_key = "email" + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """:return an iterable containing each record in the response""" + response_json = response.json() + for record in response_json.get("data", []): # API returns records in a container array "data" + record = self.map_record(record) + if record: + yield record - def path(self, **kwargs) -> str: - return "people/exclusions" + def map_record(self, record: Mapping): + nested_record = record + for field in self.suppression_fields: + if field not in nested_record: + return None + nested_record = nested_record[field] + if not nested_record: + return None + + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record -class Metrics(KlaviyoStreamV1): +class Metrics(KlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-metrics""" + page_size = None def path(self, **kwargs) -> str: return "metrics" @@ -410,72 +540,100 @@ def flatten_dict(rec, level): return processed_record -class Events(IncrementalKlaviyoStreamV1): +class Events(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/metrics-timeline""" - cursor_field = "timestamp" + cursor_field = "datetime" + page_size = None + include = "attributions,metric,profile" def __init__(self, **kwargs): super().__init__(**kwargs) self.last_next_token = None - @property - def look_back_window_in_seconds(self) -> Optional[int]: - return timedelta(minutes=30).seconds - def path(self, **kwargs) -> str: - return "metrics/timeline" + return "events" + + @property + def state_checkpoint_interval(self) -> Optional[int]: + return 5000 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """:return an iterable containing each record in the response""" response_json = response.json() for record in response_json.get("data", []): - flow = record["event_properties"].get("$flow") - flow_message_id = record["event_properties"].get("$message") + attributes = record["attributes"] + flow = attributes.get("$flow") + flow_message_id = attributes.get("$message") record["flow_id"] = flow record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None + record[self.cursor_field] = attributes[self.cursor_field] yield process_record(record) - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - super_state = super().get_updated_state(current_stream_state, latest_record) - super_state["last_next_token"] = self.last_next_token - return super_state - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed - to most other methods in this class to help you form headers, request bodies, query params, etc.. - :param response: the most recent response from the API - :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. - If there are no more pages in the result, return None. - """ - decoded_response = response.json() - if decoded_response.get("next"): - next_token = decoded_response["next"] - self.last_next_token = next_token - return {"since": next_token} +class Flows(IncrementalKlaviyoStreamLatestWithArchivedRecords): + cursor_field = "updated" + page_size = None + include = "flow-actions,tags" - self.last_next_token = None - data = decoded_response.get("data", [{}]) or [{}] - self.logger.info("Last timestamp -> " + str(data[-1].get("timestamp", "No timestamp"))) + def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: + return "flows" - return None -class Flows(ReverseIncrementalKlaviyoStreamV1): - cursor_field = "created" +class EmailTemplates(IncrementalKlaviyoStreamLatest): + """ + Docs: https://developers.klaviyo.com/en/v1-2/reference/get-templates + """ + + page_size = None + cursor_field = "updated" def path(self, **kwargs) -> str: - return "flows" + return "templates" -class EmailTemplates(KlaviyoStreamV1): +class Segments(IncrementalKlaviyoStreamLatest): """ Docs: https://developers.klaviyo.com/en/v1-2/reference/get-templates """ + page_size = None + cursor_field = "updated" + def path(self, **kwargs) -> str: - return "email-templates" + return "segments" + + +class SegmentsProfiles(KlaviyoStreamLatest): + parent_id: str = "id" + page_size = 100 + + def __init__(self, start_date: str, **kwargs): + super().__init__(**kwargs) + self._start_ts = start_date + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + parent_stream = Segments(api_key=self._api_key, start_date=self._start_ts) + slices = parent_stream.stream_slices(sync_mode=SyncMode.full_refresh) + for _slice in slices: + yield from parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice) + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + """Add incremental filters""" + params = super().request_params(stream_state, stream_slice, next_page_token) + params["additional-fields[profile]"] = "subscriptions,predictive_analytics" + return params + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"segments/{stream_slice[self.parent_id]}/profiles"