From f237e3a0c1c779cfb8ead0dab67817748598734a Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Thu, 23 May 2024 17:44:22 +0300 Subject: [PATCH 1/8] feat: klaviyo v2 - campaigns - flows --- .../connectors/source-klaviyo/setup.py | 2 +- .../source_klaviyo/schemas/campaigns.json | 175 +++++++++++++----- .../source_klaviyo/schemas/flows.json | 81 +++++++- .../source-klaviyo/source_klaviyo/source.py | 2 +- .../source-klaviyo/source_klaviyo/streams.py | 123 +++++++++++- 5 files changed, 321 insertions(+), 62 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/setup.py b/airbyte-integrations/connectors/source-klaviyo/setup.py index 0c71eecd134e..e2abf00cc000 100644 --- a/airbyte-integrations/connectors/source-klaviyo/setup.py +++ b/airbyte-integrations/connectors/source-klaviyo/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1"] +MAIN_REQUIREMENTS = ["airbyte-cdk==0.67"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "connector-acceptance-test", "requests_mock~=1.8"] diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json index 6e494a5d9b42..8c1fa1e406b8 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/campaigns.json @@ -1,55 +1,140 @@ { "type": "object", "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": ["null", "string"], "format": "date-time" }, - "updated": { "type": ["null", "string"], "format": "date-time" }, - "status": { "type": "string" }, - "status_id": { "type": "integer" }, - "status_label": { "type": "string" }, - "from_name": { "type": "string" }, - "from_email": { "type": "string" }, - "num_recipients": { "type": "integer" }, - "lists": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } - } + "updated_at": { "type": ["null", "string"], "format": "date-time" }, + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "status": { "type": "string" }, + "archived": { "type": "boolean" }, + "channel": { "type": "string" }, + "audiences": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "included": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + }, + "excluded": { + "type": ["null", "array"], + "items": { + "type": ["null", "string"] + } + } + } + }, + "send_options": { + "type": ["null", "object"], + "properties": { + "ignore_unsubscribes": { "type": ["null", "boolean"] }, + "use_smart_sending": { "type": ["null", "boolean"] } + } + }, + "message": { "type": "string" }, + "tracking_options": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "is_tracking_opens": { "type": ["null", "boolean"] }, + "is_tracking_clicks": { "type": ["null", "boolean"] }, + "is_add_utm": { "type": ["null", "boolean"] }, + "utm_params": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "properties": { + "name": { "type": "string" }, + "value": { "type": "string" } + } + } + } + } + }, + "send_strategy": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "method": { "type": "string" }, + "options_static": { + "type": ["null", "object"], + "properties": { + "datetime": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" + }, + "is_local": { "type": ["null", "boolean"] }, + "send_past_recipients_immediately": { + "type": ["null", "boolean"] + } + } + }, + "options_throttled": { + "type": ["null", "object"], + "properties": { + "datetime": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" + }, + "throttle_percentage": { "type": "integer" } + } + }, + "options_sto": { + "type": ["null", "object"], + "properties": { + "date": { "type": "string", "format": "date" } + } + } + } + }, + "created_at": { "type": ["null", "string"], "format": "date-time" }, + "scheduled_at": { "type": ["null", "string"], "format": "date-time" }, + "updated_at": { "type": ["null", "string"], "format": "date-time" }, + "send_time": { "type": ["null", "string"], "format": "date-time" } } }, - "excluded_lists": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } - } + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } } }, - "is_segmented": { "type": "boolean" }, - "send_time": { "type": ["null", "string"], "format": "date-time" }, - "sent_at": { "type": ["null", "string"], "format": "date-time" }, - "campaign_type": { "type": "string" }, - "subject": { "type": ["null", "string"] }, - "message_type": { "type": "string" }, - "template_id": { "type": ["null", "string"] } + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json index 54ea3dded05c..50ce2ba1727d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/flows.json @@ -1,13 +1,80 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "status": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, "updated": { "type": "string", "format": "date-time" }, - "customer_filter": { "type": ["null", "object"] }, - "trigger": { "type": "object" } + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "status": { "type": "string" }, + "archived": { "type": "boolean" }, + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "trigger_type": { "type": "string" } + } + }, + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "flow-actions": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + }, + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 4843cd213aa1..999ebd9bbfcf 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -41,7 +41,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: api_key = config["api_key"] start_date = config["start_date"] return [ - Campaigns(api_key=api_key), + Campaigns(api_key=api_key, start_date=start_date), Events(api_key=api_key, start_date=start_date), GlobalExclusions(api_key=api_key, start_date=start_date), Lists(api_key=api_key), diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index bb2e35e30abe..6b58530d138d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -11,6 +11,7 @@ import pendulum from datetime import timedelta import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -37,7 +38,7 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]: headers = { "Accept": "application/json", "Content-Type": "application/json", - "Revision": "2023-02-22", + "Revision": "2024-05-15", "Authorization": "Klaviyo-API-Key " + self._api_key, } @@ -65,8 +66,9 @@ def request_params( # If next_page_token is set, all of the parameters are already provided if next_page_token: return next_page_token - else: + elif self.page_size: return {"page[size]": self.page_size} + return {} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """:return an iterable containing each record in the response""" @@ -96,7 +98,12 @@ def cursor_field(self) -> Union[str, List[str]]: :return str: The name of the cursor field. """ - def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs): + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): """Add incremental filters""" stream_state = stream_state or {} @@ -122,6 +129,44 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late return {self.cursor_field: latest_cursor.isoformat()} +class IncrementalKlaviyoStreamLatestWithArchivedRecords(IncrementalKlaviyoStreamLatest, ABC): + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + return [{"archived": value} for value in [False, True]] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + archived = latest_record.get("attributes", {}).get("archived", False) + if archived: + current_stream_cursor_value = current_stream_state.get("archived", {}).get(self.cursor_field, self._start_ts) + latest_record_cursor_value = latest_record[self.cursor_field] + latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) + current_stream_state["archived"] = {self.cursor_field: latest_cursor} + return current_stream_state + else: + return super().get_updated_state(current_stream_state, latest_record) + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + current_stream_state = stream_state + if stream_state.get("archived"): + current_stream_state = stream_state.get("archived", {}) + params = super().request_params(current_stream_state, stream_slice, next_page_token) + archived = stream_slice.get("archived", False) + if archived: + archived_filter = "equals(archived,true)" + if "filter" in params and archived_filter not in params["filter"]: + params["filter"] = f"and({params['filter']},{archived_filter})" + elif "filter" not in params: + params["filter"] = archived_filter + return params + + class Profiles(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get_profiles""" @@ -357,12 +402,68 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield record -class Campaigns(KlaviyoStreamV1): +class Campaigns(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-campaigns""" - def path(self, **kwargs) -> str: + cursor_field = "updated_at" + page_size = None + current_channel = None + + def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "campaigns" + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + archived_flags = [False, True] + message_channels = ["email", "sms"] + return [{"archived": flag, "channel": channel} for flag in archived_flags for channel in message_channels] + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: + archived = latest_record.get("attributes", {}).get("archived", False) + updated_state = super().get_updated_state(current_stream_state, latest_record) + if archived: + current_stream_cursor_value = current_stream_state.get("archived", {}).get(self.cursor_field, self._start_ts) + latest_record_cursor_value = latest_record[self.cursor_field] + latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) + current_stream_state["archived"] = {self.current_channel: {self.cursor_field: latest_cursor}} + else: + current_stream_state[self.current_channel] = updated_state + return current_stream_state + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + archived = stream_slice.get("archived", False) + channel = stream_slice.get("channel", False) + if archived: + current_stream_state = stream_state.get("archived", {}).get(channel, {}) + else: + current_stream_state = stream_state.get(channel, {}) + params = super().request_params(current_stream_state, stream_slice, next_page_token) + + self.current_channel = channel + channel_filter = f"equals(messages.channel,'{channel}')" + if "filter" in params and channel_filter not in params["filter"]: + params["filter"] = f"{params['filter']},{channel_filter}" + elif "filter" not in params: + params["filter"] = channel_filter + + if archived: + archived_filter = "equals(archived,true)" + if "filter" in params and archived_filter not in params["filter"]: + params["filter"] = f"{params['filter']},{archived_filter}" + elif "filter" not in params: + params["filter"] = archived_filter + return params + class Lists(KlaviyoStreamV1): """Docs: https://developers.klaviyo.com/en/reference/get-lists""" @@ -465,12 +566,18 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return None -class Flows(ReverseIncrementalKlaviyoStreamV1): - cursor_field = "created" - def path(self, **kwargs) -> str: +class Flows(IncrementalKlaviyoStreamLatestWithArchivedRecords): + cursor_field = "updated" + page_size = None + + def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "flows" + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + class EmailTemplates(KlaviyoStreamV1): """ From dea68ce7785ec9d77ac7f0e30b565bf6f24d4d1d Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Fri, 24 May 2024 12:48:38 +0300 Subject: [PATCH 2/8] feat: klaviyo v2 - events - lists --- .../source_klaviyo/schemas/events.json | 153 ++++++++++++++---- .../source_klaviyo/schemas/lists.json | 72 ++++++++- .../source-klaviyo/source_klaviyo/source.py | 2 +- .../source-klaviyo/source_klaviyo/streams.py | 54 +++---- 4 files changed, 207 insertions(+), 74 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json index 7cf830f697a8..908f8d92e2f5 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/events.json @@ -1,42 +1,133 @@ { "type": "object", - "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, - "uuid": { "type": "string" }, - "event_name": { "type": "string" }, - "timestamp": { "type": "integer" }, - "datetime": { "type": "string" }, - "statistic_id": { "type": "string" }, - "event_properties": { - "type": "object", + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + }, + "datetime": { + "type": "string", + "format": "date-time" + }, + "attributes": { + "type": [ + "null", + "object" + ], "properties": { - "$value": { "type": "number" }, - "items": { - "type": "array", - "items": { - "type": "object", - "properties": { - "object": { "type": "string" }, - "name": { "type": "string" }, - "sku": { "type": "string" }, - "price": { "type": "number" }, - "quantity": { "type": "integer" } - } - } + "timestamp": { + "type": "integer" + }, + "event_properties": { + "type": [ + "null", + "object" + ], + "additionalProperties": true + }, + "datetime": { + "type": "string", + "format": "date-time" + }, + "uuid": { + "type": "string" } } }, - "person": { - "type": "object", + "links": { + "type": [ + "null", + "object" + ], "properties": { - "id": { "type": "string" }, - "object": { "type": "string" }, - "$email": { "type": "string" } + "self": { + "type": "string" + } } }, - "flow_id": { "type": ["null", "string"] }, - "campaign_id": { "type": ["null", "string"] }, - "flow_message_id": { "type": ["null", "string"] } + "relationships": { + "type": [ + "null", + "object" + ], + "properties": { + "profile": { + "type": [ + "null", + "object" + ], + "properties": { + "data": { + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + }, + "related": { + "type": "string" + } + } + } + } + }, + "metric": { + "type": [ + "null", + "object" + ], + "properties": { + "data": { + "type": [ + "null", + "object" + ], + "properties": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + } + } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + }, + "related": { + "type": "string" + } + } + } + } + } + } + } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json index da721281b4e4..a8486090cc97 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/lists.json @@ -1,13 +1,71 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, "updated": { "type": "string", "format": "date-time" }, - "person_count": { "type": "integer" }, - "list_type": { "type": "string" }, - "folder": { "type": ["null", "string"] } + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "name": { "type": "string" }, + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "opt_in_process": { "type": ["string", "null"] } + } + }, + "links": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "self": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "profiles": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": { + "type" : "string" + }, + "related": { + "type" : "string" + } + } + } + } + }, + "tags": { + "type": ["null", "object"], + "properties": { + "data": { + "type": "array", + "items": { + "type": ["null", "object"], + "properties": { + "type": { "type": "string" }, + "id": { "type": "string" } + } + } + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + } + } + } + } + } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 999ebd9bbfcf..8868b547b9a6 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -44,7 +44,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Campaigns(api_key=api_key, start_date=start_date), Events(api_key=api_key, start_date=start_date), GlobalExclusions(api_key=api_key, start_date=start_date), - Lists(api_key=api_key), + Lists(api_key=api_key, start_date=start_date), Metrics(api_key=api_key), Flows(api_key=api_key, start_date=start_date), EmailTemplates(api_key=api_key), diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 6b58530d138d..fad94456ab46 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -465,14 +465,20 @@ def request_params( return params -class Lists(KlaviyoStreamV1): +class Lists(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-lists""" + cursor_field = "updated" max_retries = 10 + page_size = None def path(self, **kwargs) -> str: return "lists" + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + class GlobalExclusions(ReverseIncrementalKlaviyoStreamV1): """Docs: https://developers.klaviyo.com/en/reference/get-global-exclusions""" @@ -511,61 +517,39 @@ def flatten_dict(rec, level): return processed_record -class Events(IncrementalKlaviyoStreamV1): +class Events(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/metrics-timeline""" - cursor_field = "timestamp" + cursor_field = "datetime" + page_size = None def __init__(self, **kwargs): super().__init__(**kwargs) self.last_next_token = None - @property - def look_back_window_in_seconds(self) -> Optional[int]: - return timedelta(minutes=30).seconds - def path(self, **kwargs) -> str: - return "metrics/timeline" + return "events" + + @property + def state_checkpoint_interval(self) -> Optional[int]: + return 5000 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """:return an iterable containing each record in the response""" response_json = response.json() for record in response_json.get("data", []): - flow = record["event_properties"].get("$flow") - flow_message_id = record["event_properties"].get("$message") + attributes = record["attributes"] + flow = attributes.get("$flow") + flow_message_id = attributes.get("$message") record["flow_id"] = flow record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None + record[self.cursor_field] = attributes[self.cursor_field] yield process_record(record) - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - super_state = super().get_updated_state(current_stream_state, latest_record) - super_state["last_next_token"] = self.last_next_token - return super_state - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed - to most other methods in this class to help you form headers, request bodies, query params, etc.. - :param response: the most recent response from the API - :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response. - If there are no more pages in the result, return None. - """ - decoded_response = response.json() - if decoded_response.get("next"): - next_token = decoded_response["next"] - self.last_next_token = next_token - return {"since": next_token} - - self.last_next_token = None - data = decoded_response.get("data", [{}]) or [{}] - self.logger.info("Last timestamp -> " + str(data[-1].get("timestamp", "No timestamp"))) - - return None - class Flows(IncrementalKlaviyoStreamLatestWithArchivedRecords): cursor_field = "updated" From 4362c9a3224d1132249b377a92ac9f8e0ef080f9 Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Fri, 24 May 2024 18:22:15 +0300 Subject: [PATCH 3/8] feat: klaviyo v2 - metrics --- .../source_klaviyo/schemas/metrics.json | 29 ++++++++++++------- .../source-klaviyo/source_klaviyo/streams.py | 17 +++++++---- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json index bfdb31a0e45e..e2ceda51361a 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/metrics.json @@ -1,19 +1,28 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, + "type": { "type": "string" }, "id": { "type": "string" }, - "name": { "type": "string" }, - "created": { "type": "string", "format": "date-time" }, - "updated": { "type": "string", "format": "date-time" }, - "integration": { - "type": "object", + "attributes": { + "type": ["null", "object"], "properties": { - "object": { "type": "string" }, - "id": { "type": "string" }, "name": { "type": "string" }, - "category": { "type": "string" } + "created": { "type": "string", "format": "date-time" }, + "updated": { "type": "string", "format": "date-time" }, + "integration": { + "type": [ "null", "object"], + "additionalProperties": true + } + } + }, + "links": { + "type": [ "null", "object" ], + "additionalProperties": true, + "properties": { + "self": { "type": "string"} } } } -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index fad94456ab46..99ae5e78edc0 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -9,7 +9,6 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum -from datetime import timedelta import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy @@ -23,6 +22,7 @@ class KlaviyoStreamLatest(HttpStream, ABC): url_base = "https://a.klaviyo.com/api/" primary_key = "id" page_size = 100 + include = None def __init__(self, api_key: str, **kwargs): super().__init__(**kwargs) @@ -66,9 +66,12 @@ def request_params( # If next_page_token is set, all of the parameters are already provided if next_page_token: return next_page_token - elif self.page_size: - return {"page[size]": self.page_size} - return {} + params = {} + if self.page_size: + params["page[size]"] = self.page_size + if self.include: + params["include"] = self.include + return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: """:return an iterable containing each record in the response""" @@ -471,6 +474,7 @@ class Lists(IncrementalKlaviyoStreamLatest): cursor_field = "updated" max_retries = 10 page_size = None + include = "tags" def path(self, **kwargs) -> str: return "lists" @@ -491,8 +495,9 @@ def path(self, **kwargs) -> str: return "people/exclusions" -class Metrics(KlaviyoStreamV1): +class Metrics(KlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get-metrics""" + page_size = None def path(self, **kwargs) -> str: return "metrics" @@ -522,6 +527,7 @@ class Events(IncrementalKlaviyoStreamLatest): cursor_field = "datetime" page_size = None + include = "attributions,metric,profile" def __init__(self, **kwargs): super().__init__(**kwargs) @@ -554,6 +560,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class Flows(IncrementalKlaviyoStreamLatestWithArchivedRecords): cursor_field = "updated" page_size = None + include = "flow-actions,tags" def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "flows" From 35fd05a6d6e3415f0e98887aff34aebc5232013f Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Tue, 28 May 2024 10:04:31 +0300 Subject: [PATCH 4/8] feat: klaviyo v2 - profiles - global_exclusions - email_templates --- .../schemas/email_templates.json | 90 +++++++++++++++---- .../schemas/global_exclusions.json | 31 ++++++- .../source-klaviyo/source_klaviyo/source.py | 2 +- .../source-klaviyo/source_klaviyo/streams.py | 49 ++++++++-- 4 files changed, 139 insertions(+), 33 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json index 09a5cba42c4b..1ecc20730a38 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/email_templates.json @@ -1,27 +1,79 @@ { + "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": true, "properties": { - "object": { + "type": { + "type": "string" + }, + "id": { + "type": "string" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "attributes": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "name": { "type": "string" - }, - "id": { + }, + "editor_type": { + "type": [ + "null", + "string" + ] + }, + "html": { "type": "string" - }, - "name": { - "type": ["null", "string"] - }, - "html": { - "type": ["null", "string"] - }, - "is_writeable": { - "type": ["null", "boolean"] - }, - "created": { - "type": "string", "format": "date-time" - }, - "updated": { - "type": "string", "format": "date-time" + }, + "text": { + "type": [ + "null", + "string" + ] + }, + "created": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "company_id": { + "type": [ + "null", + "string" + ] + } } + }, + "links": { + "type": [ + "null", + "object" + ], + "additionalProperties": true, + "properties": { + "self": { + "type": "string" + } + } + } } -} + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json index 84e21e29f70e..a7ca8550707d 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/global_exclusions.json @@ -1,9 +1,32 @@ { "type": "object", + "additionalProperties": true, "properties": { - "object": { "type": "string" }, - "email": { "type": "string" }, - "reason": { "type": "string" }, - "timestamp": { "type": "string", "format": "date-time" } + "type": { "type": ["null", "string"] }, + "id": { "type": "string" }, + "updated": { "type": ["null", "string"], "format": "date-time" }, + "attributes": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "email": { "type": ["null", "string"] }, + "phone_number": { "type": ["null", "string"] }, + "first_name": { "type": ["null", "string"] }, + "last_name": { "type": ["null", "string"] }, + "properties": { + "type": ["null", "object"], + "additionalProperties": true + }, + "subscriptions": { "type": ["null", "object"] }, + "organization": { "type": ["null", "string"] }, + "title": { "type": ["null", "string"] }, + "created": { "type": ["null", "string"], "format": "date-time" }, + "updated": { "type": ["null", "string"], "format": "date-time" }, + "last_event_date": { "type": ["null", "string"], "format": "date-time" } + } + }, + "links": { "type": ["null", "object"] }, + "relationships": { "type": ["null", "object"] }, + "segments": { "type": ["null", "object"] } } } diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index 8868b547b9a6..f62d3c99dfd1 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -47,6 +47,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Lists(api_key=api_key, start_date=start_date), Metrics(api_key=api_key), Flows(api_key=api_key, start_date=start_date), - EmailTemplates(api_key=api_key), + EmailTemplates(api_key=api_key, start_date=start_date), Profiles(api_key=api_key, start_date=start_date), ] diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 99ae5e78edc0..276569f0ef2f 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -174,6 +174,7 @@ class Profiles(IncrementalKlaviyoStreamLatest): """Docs: https://developers.klaviyo.com/en/reference/get_profiles""" cursor_field = "updated" + page_size = 100 def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "profiles" @@ -182,6 +183,17 @@ def map_record(self, record: Mapping): record[self.cursor_field] = record["attributes"][self.cursor_field] return record + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + """Add incremental filters""" + params = super().request_params(stream_state, stream_slice, next_page_token) + params["additional-fields[profile]"] = "predictive_analytics" + return params + class KlaviyoStreamV1(HttpStream, ABC): """Base stream for api v1""" @@ -323,7 +335,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, decoded_response = response.json() if decoded_response.get("next"): return {"since": decoded_response["next"]} - + data = decoded_response.get("data", [{}]) or [{}] self.logger.info("Last timestamp -> " + str(data[-1].get("timestamp", "No timestamp"))) @@ -484,15 +496,27 @@ def map_record(self, record: Mapping): return record -class GlobalExclusions(ReverseIncrementalKlaviyoStreamV1): +class GlobalExclusions(Profiles): """Docs: https://developers.klaviyo.com/en/reference/get-global-exclusions""" + suppression_fields = ["attributes", "subscriptions", "email", "marketing", "suppression"] + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + """:return an iterable containing each record in the response""" + response_json = response.json() + for record in response_json.get("data", []): # API returns records in a container array "data" + record = self.map_record(record) + if record: + yield record - page_size = 5000 # the maximum value allowed by API - cursor_field = "timestamp" - primary_key = "email" + def map_record(self, record: Mapping): + nested_record = record + for field in self.suppression_fields: + if field not in nested_record: + return None + nested_record = record[field] - def path(self, **kwargs) -> str: - return "people/exclusions" + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record class Metrics(KlaviyoStreamLatest): @@ -570,10 +594,17 @@ def map_record(self, record: Mapping): return record -class EmailTemplates(KlaviyoStreamV1): +class EmailTemplates(IncrementalKlaviyoStreamLatest): """ Docs: https://developers.klaviyo.com/en/v1-2/reference/get-templates """ + page_size = None + cursor_field = "updated" + + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + def path(self, **kwargs) -> str: - return "email-templates" + return "templates" From e00a856d3303ba1bbe26ed0d38719c8b88e9befa Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Thu, 30 May 2024 19:13:14 +0300 Subject: [PATCH 5/8] feat: klaviyo v2 - segments - segments profiles --- .../source_klaviyo/schemas/segments.json | 213 ++++++++++++++++++ .../schemas/segments_profiles.json | 153 +++++++++++++ .../source-klaviyo/source_klaviyo/source.py | 4 +- .../source-klaviyo/source_klaviyo/streams.py | 67 ++++-- 4 files changed, 416 insertions(+), 21 deletions(-) create mode 100644 airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json create mode 100644 airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json new file mode 100644 index 000000000000..7945c2cee6e9 --- /dev/null +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments.json @@ -0,0 +1,213 @@ +{ + "type": "object", + "additionalProperties": true, + "properties": + { + "type": + { + "type": + [ + "null", + "string" + ] + }, + "id": + { + "type": "string" + }, + "updated": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + }, + "attributes": + { + "additionalProperties": true, + "type": + [ + "null", + "object" + ], + "properties": + { + "name": + { + "type": "string" + }, + "additionalProperties": true, + "is_active": + { + "type": "boolean" + }, + "is_processing": + { + "type": "boolean" + }, + "is_started": + { + "type": "boolean" + }, + "created": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + }, + "updated": + { + "type": + [ + "null", + "string" + ], + "format": "date-time" + } + } + }, + "links": + { + "type": + [ + "null", + "object" + ], + "additionalProperties": true, + "properties": + { + "self": + { + "type": "string" + } + } + }, + "relationships": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "profiles": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "links": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "self": + { + "type": "string" + }, + "related": + { + "type": "string" + } + } + } + } + }, + "tags": + { + "type": + [ + "null", + "object" + ], + "date": + { + "type": + { + "type": "string" + }, + "id": + { + "type": "string" + } + }, + "links": + { + "type": + [ + "null", + "object" + ], + "properties": + { + "self": + { + "type": "string" + }, + "related": + { + "type": "string" + } + } + } + } + } + }, + "included": + { + "type": + { + "type": "string" + }, + "id": + { + "type": "string" + }, + "attributes": + { + "type": + [ + "null", + "object" + ], + "additionalProperties": true, + "properties": + { + "name": + { + "type": "string" + } + } + }, + "links": + { + "type": + [ + "null", + "objects" + ], + "properties": + { + "additionalProperties": true, + "self": + { + "type": "string" + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json new file mode 100644 index 000000000000..b774efbcf72b --- /dev/null +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/schemas/segments_profiles.json @@ -0,0 +1,153 @@ +{ + "type": "object", + "additionalProperties": true, + "properties": { + "type": {"type": ["null", "string"]}, + "id": {"type": "string"}, + "attributes": { + "email": {"type": ["null", "string"]}, + "phone_number": {"type": ["null", "string"]}, + "external_id" : {"type": ["null", "string"]}, + "first_name": {"type": ["null", "string"]}, + "last_name": {"type": ["null", "string"]}, + "organization": {"type": ["null", "string"]}, + "title": {"type": ["null", "string"]}, + "image": {"type": ["null", "string"]}, + "updated": {"type": ["null", "string"], "format": "date-time"}, + "created": {"type": ["null", "string"], "format": "date-time"}, + "last_event_date": {"type": ["null", "string"], "format": "date-time"}, + "joined_group_at": {"type": ["null", "string"], "format": "date-time"}, + "location": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "address1": {"type": ["null", "string"]}, + "address2": {"type": ["null", "string"]}, + "city": {"type": ["null", "string"]}, + "country": {"type": ["null", "string"]}, + "latitude": {"type": ["null", "string"]}, + "longitude": {"type": ["null", "string"]}, + "region": {"type": ["null", "string"]}, + "zip": {"type": ["null", "string"]}, + "timezone": {"type": ["null", "string"]}, + "ip": {"type": ["null", "string"]} + }, + "subscriptions": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "email": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "marketing": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "can_receive_email_marketing": {"type": "boolean"}, + "consent": {"type": ["null", "string"]}, + "consent_timestamp": {"type": ["null", "string"], "format": "date-time"}, + "last_updated": {"type": ["null", "string"], "format": "date-time"}, + "method": {"type": ["null", "string"]}, + "method_details": {"type": ["null", "string"]}, + "custom_method_detail": {"type": ["null", "string"]}, + "double_optin" : {"type": ["null", "string"]}, + "suppression": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "reason" : {"type" : ["null", "string"]}, + "timestamp" : {"type" : ["null", "string"], "format" : "date-time"} + } + } + }, + "list_suppressions": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "list_id" : {"type" : ["null", "string"]}, + "reason" : {"type" : ["null", "string"]}, + "timestamp" : {"type" : ["null", "string"], "format" : "date-time"} + } + } + } + } + } + } + }, + "sms": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "marketing": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "can_receive_sms_marketing": {"type": "boolean"}, + "consent": {"type": ["null", "string"]}, + "consent_timestamp": {"type": ["null", "string"], "format": "date-time"}, + "method": {"type": ["null", "string"]}, + "method_details": {"type": ["null", "string"]}, + "last_updated": {"type": ["null", "string"], "format": "date-time"} + } + } + } + } + } + }, + "predictive_analytics": { + "historic_clv": {"type": ["null", "number"]}, + "predicted_clv": {"type": ["null", "number"]}, + "total_clv": {"type": ["null", "number"]}, + "historic_number_of_orders": {"type": ["null", "number"]}, + "predicted_number_of_orders": {"type": ["null", "number"]}, + "average_days_between_orders": {"type": ["null", "number"]}, + "average_order_value": {"type": ["null", "number"]}, + "churn_probability": {"type": ["null", "number"]}, + "expected_date_of_next_order": {"type": ["null", "string"], "format": "date-time"} + }, + "links": { + "type": ["null", "object"], + "properties": { + "self": { "type": "string" }, + "related": { "type": "string" } + } + }, + "relationships": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "lists": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": {"type" : "string"}, + "related": {"type" : "string"} + } + } + } + }, + "segments": { + "type": ["null", "object"], + "properties" : { + "links": { + "type" : ["null", "object"], + "properties" : { + "self": {"type" : "string"}, + "related": {"type" : "string"} + } + } + } + } + } + } + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py index f62d3c99dfd1..160e3470bb64 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/source.py @@ -8,7 +8,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles +from source_klaviyo.streams import Campaigns, EmailTemplates, Events, Flows, GlobalExclusions, Lists, Metrics, Profiles, Segments, SegmentsProfiles class SourceKlaviyo(AbstractSource): @@ -49,4 +49,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Flows(api_key=api_key, start_date=start_date), EmailTemplates(api_key=api_key, start_date=start_date), Profiles(api_key=api_key, start_date=start_date), + Segments(api_key=api_key, start_date=start_date), + SegmentsProfiles(api_key=api_key, start_date=start_date) ] diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 276569f0ef2f..75858b269f92 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -92,6 +92,10 @@ def __init__(self, start_date: str, **kwargs): super().__init__(**kwargs) self._start_ts = start_date + def map_record(self, record: Mapping): + record[self.cursor_field] = record["attributes"][self.cursor_field] + return record + @property @abstractmethod def cursor_field(self) -> Union[str, List[str]]: @@ -179,10 +183,6 @@ class Profiles(IncrementalKlaviyoStreamLatest): def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "profiles" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record - def request_params( self, stream_state: Mapping[str, Any] = None, @@ -427,10 +427,6 @@ class Campaigns(IncrementalKlaviyoStreamLatest): def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "campaigns" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record - def stream_slices( self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: @@ -491,10 +487,6 @@ class Lists(IncrementalKlaviyoStreamLatest): def path(self, **kwargs) -> str: return "lists" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record - class GlobalExclusions(Profiles): """Docs: https://developers.klaviyo.com/en/reference/get-global-exclusions""" @@ -589,10 +581,6 @@ class Flows(IncrementalKlaviyoStreamLatestWithArchivedRecords): def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return "flows" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record - class EmailTemplates(IncrementalKlaviyoStreamLatest): """ @@ -602,9 +590,48 @@ class EmailTemplates(IncrementalKlaviyoStreamLatest): page_size = None cursor_field = "updated" - def map_record(self, record: Mapping): - record[self.cursor_field] = record["attributes"][self.cursor_field] - return record - def path(self, **kwargs) -> str: return "templates" + + +class Segments(IncrementalKlaviyoStreamLatest): + """ + Docs: https://developers.klaviyo.com/en/v1-2/reference/get-templates + """ + + page_size = None + cursor_field = "updated" + + def path(self, **kwargs) -> str: + return "segments" + + +class SegmentsProfiles(KlaviyoStreamLatest): + parent_id: str = "id" + page_size = 100 + + def __init__(self, start_date: str, **kwargs): + super().__init__(**kwargs) + self._start_ts = start_date + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + parent_stream = Segments(api_key=self._api_key, start_date=self._start_ts) + slices = parent_stream.stream_slices(sync_mode=SyncMode.full_refresh) + for _slice in slices: + yield from parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice) + + def request_params( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Optional[Mapping[str, Any]] = None, + next_page_token: Mapping[str, Any] = None, + **kwargs): + """Add incremental filters""" + params = super().request_params(stream_state, stream_slice, next_page_token) + params["additional-fields[profile]"] = "subscriptions,predictive_analytics" + return params + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"segments/{stream_slice[self.parent_id]}/profiles" From 9ce2d2f20e6a17c44cce00db75b0dc03a78bd17d Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Wed, 5 Jun 2024 16:39:14 +0300 Subject: [PATCH 6/8] feat: klaviyo v2 - add subscriptions for profiles and global exclusions --- .../connectors/source-klaviyo/source_klaviyo/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 75858b269f92..3d29209ecec2 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -191,7 +191,7 @@ def request_params( **kwargs): """Add incremental filters""" params = super().request_params(stream_state, stream_slice, next_page_token) - params["additional-fields[profile]"] = "predictive_analytics" + params["additional-fields[profile]"] = "predictive_analytics,subscriptions" return params @@ -505,7 +505,7 @@ def map_record(self, record: Mapping): for field in self.suppression_fields: if field not in nested_record: return None - nested_record = record[field] + nested_record = nested_record[field] record[self.cursor_field] = record["attributes"][self.cursor_field] return record From 09c4a6d65a7a055d50f1bc36a6f123775397d396 Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Wed, 5 Jun 2024 17:02:15 +0300 Subject: [PATCH 7/8] feat: klaviyo v2 - fix global_exclusions stream --- .../connectors/source-klaviyo/source_klaviyo/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index 3d29209ecec2..b06db009dfb9 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -506,6 +506,8 @@ def map_record(self, record: Mapping): if field not in nested_record: return None nested_record = nested_record[field] + if not nested_record: + return None record[self.cursor_field] = record["attributes"][self.cursor_field] return record From 566b72911f5e55e9ee521ebca67de18b908328f4 Mon Sep 17 00:00:00 2001 From: Alexandros Milaios Date: Thu, 20 Jun 2024 17:56:25 +0300 Subject: [PATCH 8/8] feat: klaviyo v2 - fix flows archive issue --- .../connectors/source-klaviyo/source_klaviyo/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index b06db009dfb9..706bcc39b048 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -149,7 +149,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late current_stream_cursor_value = current_stream_state.get("archived", {}).get(self.cursor_field, self._start_ts) latest_record_cursor_value = latest_record[self.cursor_field] latest_cursor = max(pendulum.parse(latest_record_cursor_value), pendulum.parse(current_stream_cursor_value)) - current_stream_state["archived"] = {self.cursor_field: latest_cursor} + current_stream_state["archived"] = {self.cursor_field: str(latest_cursor)} return current_stream_state else: return super().get_updated_state(current_stream_state, latest_record)