diff --git a/frontend/public/services/brevo.png b/frontend/public/services/brevo.png new file mode 100644 index 0000000000000..65dfdbac8030a Binary files /dev/null and b/frontend/public/services/brevo.png differ diff --git a/posthog/cdp/templates/__init__.py b/posthog/cdp/templates/__init__.py index 2bf9f517aeedc..43917a62e75a1 100644 --- a/posthog/cdp/templates/__init__.py +++ b/posthog/cdp/templates/__init__.py @@ -38,6 +38,7 @@ TemplateGoogleCloudStorageMigrator, ) from .airtable.template_airtable import template as airtable +from .brevo.template_brevo import template as brevo from ._internal.template_broadcast import template_new_broadcast as _broadcast HOG_FUNCTION_TEMPLATES = [ @@ -50,6 +51,7 @@ avo, aws_kinesis, braze, + brevo, clearbit, customerio, discord, diff --git a/posthog/cdp/templates/brevo/template_brevo.py b/posthog/cdp/templates/brevo/template_brevo.py new file mode 100644 index 0000000000000..89ae28007c6f6 --- /dev/null +++ b/posthog/cdp/templates/brevo/template_brevo.py @@ -0,0 +1,81 @@ +from posthog.cdp.templates.hog_function_template import HogFunctionTemplate + +template: HogFunctionTemplate = HogFunctionTemplate( + status="beta", + type="destination", + id="template-brevo", + name="Brevo", + description="Update contacts in Brevo", + icon_url="/static/services/brevo.png", + category=["Email Marketing"], + hog=""" +if (empty(inputs.email)) { + print('No email set. Skipping...') + return +} + +let body := { + 'email': inputs.email, + 'updateEnabled': true, + 'attributes': {} +} + +for (let key, value in inputs.attributes) { + if (not empty(value)) { + body.attributes[key] := value + } +} + +let res := fetch(f'https://api.brevo.com/v3/contacts', { + 'method': 'POST', + 'headers': { + 'api-key': inputs.apiKey, + 'Content-Type': 'application/json', + }, + 'body': body +}) +if (res.status >= 400) { + throw Error(f'Error from api.brevo.com (status {res.status}): {res.body}') +} +""".strip(), + inputs_schema=[ + { + "key": "apiKey", + "type": "string", + "label": "Brevo API Key", + "description": "Check out this page on how to get your API key: https://help.brevo.com/hc/en-us/articles/209467485-Create-and-manage-your-API-keys", + "secret": True, + "required": True, + }, + { + "key": "email", + "type": "string", + "label": "Email of the user", + "description": "Where to find the email for the contact to be created. You can use the filters section to filter out unwanted emails or internal users.", + "default": "{person.properties.email}", + "secret": False, + "required": True, + }, + { + "key": "attributes", + "type": "dictionary", + "label": "Attributes", + "description": "For information on potential attributes, refer to the following page: https://help.brevo.com/hc/en-us/articles/10617359589906-Create-and-manage-contact-attributes", + "default": { + "EMAIL": "{person.properties.email}", + "FIRSTNAME": "{person.properties.firstname}", + "LASTNAME": "{person.properties.lastname}", + }, + "secret": False, + "required": True, + }, + ], + filters={ + "events": [ + {"id": "$identify", "name": "$identify", "type": "events", "order": 0}, + {"id": "$set", "name": "$set", "type": "events", "order": 0}, + ], + "actions": [], + "filter_test_accounts": True, + }, +) diff --git a/posthog/cdp/templates/brevo/test_template_brevo.py b/posthog/cdp/templates/brevo/test_template_brevo.py new file mode 100644 index 0000000000000..839dabbc5a3b0 --- /dev/null +++ b/posthog/cdp/templates/brevo/test_template_brevo.py @@ -0,0 +1,46 @@ +from inline_snapshot import snapshot +from posthog.cdp.templates.helpers import BaseHogFunctionTemplateTest +from posthog.cdp.templates.brevo.template_brevo import ( + template as template_brevo, +) + + +def create_inputs(**kwargs): + inputs = { + "apiKey": "apikey12345", + "email": "max@posthog.com", + "attributes": {"EMAIL": "max@posthog.com", "FIRSTNAME": "Max"}, + } + inputs.update(kwargs) + + return inputs + + +class TestTemplateBrevo(BaseHogFunctionTemplateTest): + template = template_brevo + + def test_function_works(self): + self.run_function(inputs=create_inputs()) + assert self.get_mock_fetch_calls()[0] == snapshot( + ( + "https://api.brevo.com/v3/contacts", + { + "method": "POST", + "headers": { + "api-key": "apikey12345", + "Content-Type": "application/json", + }, + "body": { + "email": "max@posthog.com", + "updateEnabled": True, + "attributes": {"EMAIL": "max@posthog.com", "FIRSTNAME": "Max"}, + }, + }, + ) + ) + + def test_function_requires_identifier(self): + self.run_function(inputs=create_inputs(email="")) + + assert not self.get_mock_fetch_calls() + assert self.get_mock_print_calls() == snapshot([("No email set. Skipping...",)])