Skip to content

Commit

Permalink
chore: add email column for campaign and events Klaviyo v2 Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
am6010 committed Oct 31, 2024
1 parent 011ec41 commit 35470d2
Showing 1 changed file with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class Campaigns(IncrementalKlaviyoStreamLatest):
cursor_field = "updated_at"
page_size = None
current_channel = None
include = "campaign-messages,tags"

def path(self, *args, next_page_token: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return "campaigns"
Expand Down Expand Up @@ -475,6 +476,22 @@ def request_params(
params["filter"] = archived_filter
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
included = response_json.get("included", [])
campaign_messages = [record for record in included if record["type"] == "campaign-message"]
campaign_message_cache = {record["id"]: record for record in campaign_messages}
for record in response_json.get("data", []):
relationships = record.get("relationships", {})
campaign_messages_data = relationships.get("campaign-messages", {}).get("data", [])
for idx, campaign_message in enumerate(campaign_messages_data):
campaign_message_id = campaign_message.get("id", None)
if campaign_message_id and campaign_message_id in campaign_message_cache:
message = campaign_message_cache.get(campaign_message_id)
record[f"from_email_{idx}"] = message.get("attributes", {}).get("content", {}).get("from_email", None)

record = self.map_record(record)
yield record

class Lists(IncrementalKlaviyoStreamLatest):
"""Docs: https://developers.klaviyo.com/en/reference/get-lists"""
Expand Down Expand Up @@ -562,15 +579,23 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
""":return an iterable containing each record in the response"""

response_json = response.json()
profiles = [record for record in response_json.get("included", []) if record["type"] == "profile"]
profile_cache = {record["id"]: record for record in profiles}
for record in response_json.get("data", []):
attributes = record["attributes"]
flow = attributes.get("$flow")
flow_message_id = attributes.get("$message")

event_properties = attributes.get("event_properties", {})
flow = event_properties.get("$flow")
flow_message_id = event_properties.get("$message")
record["flow_id"] = flow
record["flow_message_id"] = flow_message_id
record["campaign_id"] = flow_message_id if not flow else None
record[self.cursor_field] = attributes[self.cursor_field]
profiles_data = record.get("relationships", {}).get("profile", {}).get("data", None)
if profiles_data:
profile_id = profiles_data.get("id", None)
if profile_id and profile_id in profile_cache:
profile = profile_cache.get(profile_id)
record["profile_email"] = profile.get("attributes", {}).get("email", None)

yield process_record(record)

Expand Down

0 comments on commit 35470d2

Please sign in to comment.