diff --git a/README.md b/README.md index a785b7c..04a0b26 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,15 @@ pipeline_id = 'my_pipeline_id' client = HubSpotClient( access_token=access_token, - PIPELINE_ID=pipeline_id, + pipeline_id=pipeline_id, ) ``` You can also set the environment variables `HUBSPOT_ACCESS_TOKEN` and `HUBS_PIPELINE_ID` which will be used as defaults if no access_token or -pipeline_id are passed to the `HubSpotClient`. +pipeline_id are passed to the `HubSpotClient`. This can be done by copying +the .env.template file from `hs_api\.env.template` into the root of the +project and renaming it to .env. More details on how to use the client can be found in the test cases that @@ -77,7 +79,7 @@ normal, which will kick off the github actions to run the linting and tests. Be aware that a couple of the tests can be flakey due to the delay in the asynchronous way hubspot returns results and actually applies them to the -underlying data. There are dealys in place to account for this but there can +underlying data. There are delays in place to account for this but there can be cases where a test fails because a record appears to have not been created. This probably needs reworking, but feel free to re-run the tests. diff --git a/hs_api/.env.template b/hs_api/.env.template index 42d945b..0087396 100644 --- a/hs_api/.env.template +++ b/hs_api/.env.template @@ -1,5 +1,5 @@ HUBSPOT_ACCESS_TOKEN= -HUBS_PIPELINE_ID= +HUBSPOT_TEST_PIPELINE_ID= HUBSPOT_TEST_ACCESS_TOKEN= -HUBS_TEST_PIPELINE_ID= +HUBSPOT_TEST_PIPELINE_ID= diff --git a/hs_api/api/hubspot_api.py b/hs_api/api/hubspot_api.py index a795a78..b4c28d0 100644 --- a/hs_api/api/hubspot_api.py +++ b/hs_api/api/hubspot_api.py @@ -1,6 +1,8 @@ import time +from collections.abc import Generator +from datetime import datetime +from typing import Dict, Optional -import requests from hubspot import HubSpot from hubspot.auth.oauth import ApiException from hubspot.crm.contacts import ( @@ -24,7 +26,7 @@ } BATCH_LIMITS = 50 -EMAIL_BATCH_LIMIT = 1000 +EMAIL_BATCH_LIMIT = 10 RETRY_LIMIT = 3 RETRY_WAIT = 60 @@ -64,6 +66,8 @@ def create_lookup(self): "contact": self._client.crm.contacts.basic_api.create, "company": self._client.crm.companies.basic_api.create, "deal": self._client.crm.deals.basic_api.create, + "ticket": self._client.crm.tickets.basic_api.create, + "email": self._client.crm.objects.emails.basic_api.create, } @property @@ -71,6 +75,7 @@ def search_lookup(self): return { "contact": self._client.crm.contacts.search_api.do_search, "company": self._client.crm.companies.search_api.do_search, + "email": self._client.crm.objects.emails.search_api.do_search, } @property @@ -112,14 +117,17 @@ def pipeline_details(self, pipeline_id=None, return_all_pipelines=False): pipelines = [x for x in pipelines if x.id == pipeline_id] return pipelines - def _find(self, object_name, property_name, value, sort): - query = Filter(property_name=property_name, operator="EQ", value=value) - filter_groups = [FilterGroup(filters=[query])] + def _find(self, object_name, property_name, value, sort, limit=20, after=0): + filter_groups = None + if property_name and value: + query = Filter(property_name=property_name, operator="EQ", value=value) + filter_groups = [FilterGroup(filters=[query])] public_object_search_request = PublicObjectSearchRequest( - limit=20, + limit=limit, filter_groups=filter_groups, sorts=sort, + after=after, ) response = self.search_lookup[object_name]( @@ -150,19 +158,71 @@ def _update(self, object_name, object_id, properties): print(f"Exception when updating {object_name}: {e}\n") def find_contact(self, property_name, value): - sort = [{"propertyName": "hs_object_id", "direction": "ASCENDING"}] response = self._find("contact", property_name, value, sort) return response.results - def find_company(self, property_name, value): + def find_contact_iter( + self, property_name: str, value: str, limit: int = 20 + ) -> Generator[Dict, None, None]: + """ + Searches for a contact in Hubspot and returns results as a generator + + :param property_name: The field name from Hubspot + :param value: The value to search in the field property_name + :param limit: The number of results to return per iteration + :return: Dictionary of results + """ + sort = [{"propertyName": "hs_object_id", "direction": "ASCENDING"}] + after = 0 + while True: + response = self._find( + "contact", property_name, value, sort, limit=limit, after=after + ) + if not response.results: + break + + yield response.results + + if not response.paging: + break + after = response.paging.next.after + + def find_company(self, property_name, value): sort = [{"propertyName": "hs_lastmodifieddate", "direction": "DESCENDING"}] response = self._find("company", property_name, value, sort) return response.results + def find_company_iter( + self, property_name: str, value: str, limit: int = 20 + ) -> Generator[Dict, None, None]: + """ + Searches for a company in Hubspot and returns results as a generator + + :param property_name: The field name from Hubspot + :param value: The value to search in the field property_name + :param limit: The number of results to return per iteration + :return: Dictionary of results + """ + sort = [{"propertyName": "hs_lastmodifieddate", "direction": "DESCENDING"}] + after = 0 + + while True: + response = self._find( + "company", property_name, value, sort, limit=limit, after=after + ) + if not response.results: + break + + yield response.results + + if not response.paging: + break + after = response.paging.next.after + def find_deal(self, property_name, value): pipeline_filter = Filter( property_name="pipeline", operator="EQ", value=self.pipeline_id @@ -195,6 +255,16 @@ def _find_owner_by_id(self, owner_id): response = self._client.crm.owners.owners_api.get_by_id(owner_id=owner_id) return response + def find_all_owners(self): + after = None + while True: + response = self._client.crm.owners.owners_api.get_page(after=after) + yield response + + if not response.paging: + break + after = response.paging.next.after + def find_owner(self, property_name, value): if property_name not in ("id", "email"): raise NameError( @@ -206,7 +276,9 @@ def find_owner(self, property_name, value): if property_name == "email": return self._find_owner_by_email(email=value) - def find_all_email_events(self, filter_name=None, filter_value=None): + def find_all_email_events( + self, filter_name=None, filter_value=None, limit=EMAIL_BATCH_LIMIT, **parameters + ): """ Finds and returns all email events, using the filter name and value as the high watermark for the events to return. If None are provided, it @@ -215,40 +287,32 @@ def find_all_email_events(self, filter_name=None, filter_value=None): This iterates over batches, using the previous batch as the new high watermark for the next batch to be returned until there are no more records or batches to return. - - NOTE: This currently uses the requests library to use the v1 api for the - events as there is currently as per the Hubspot website - https://developers.hubspot.com/docs/api/events/email-analytics. - Once this is released we can transition over to using that. """ + sort = [{"propertyName": "hs_lastmodifieddate", "direction": "DESCENDING"}] retry = 0 - offset = None + after = None while True: try: - params = { - "limit": EMAIL_BATCH_LIMIT, - "offset": offset, - } if filter_name: - params[filter_name] = filter_value - - response = requests.get( - "https://api.hubapi.com/email/public/v1/events", - headers={"Authorization": f"Bearer {self._access_token}"}, - params=params, + parameters[filter_name] = filter_value + + resp = self._find( + "email", + property_name=filter_name, + value=filter_value, + limit=limit, + after=after, + sort=sort, ) - response.raise_for_status() - - response_json = response.json() + if not resp.results: + break - yield response_json.get("events", []) + yield resp.results - # Update after to page onto next batch if there is next otherwise break as - # there are no more batches to iterate over. - offset = response_json.get("offset", False) - if not response_json.get("hasMore", False): + if not resp.paging: break - retry = 0 + after = resp.paging.next.after + except HTTPError as e: status_code = e.response.status_code if retry >= RETRY_LIMIT: @@ -319,6 +383,9 @@ def find_all_tickets( else: after = None + def find_ticket(self, ticket_id): + return self._client.crm.tickets.basic_api.get_by_id(ticket_id) + def find_all_deals( self, filter_name=None, @@ -374,10 +441,9 @@ def find_all_deals( # Update after to page onto next batch if there is next otherwise break as # there are no more batches to iterate over. - if response.paging: - after = response.paging.next.after - else: - after = None + if not response.paging: + break + after = response.paging.next.after def create_contact(self, email, first_name, last_name, **properties): properties = dict( @@ -428,6 +494,45 @@ def create_deal( ) return response + def create_ticket(self, subject, **properties): + properties = dict(subject=subject, **properties) + response = self._create("ticket", properties) + return response + + def create_email( + self, + hs_timestamp: Optional[datetime] = None, + hs_email_direction: Optional[str] = "EMAIL", + **properties, + ): + """ + See documentation at https://developers.hubspot.com/docs/api/crm/email + + :param hs_timestamp: This field marks the email's time of creation and determines where the email sits on the + record timeline. You can use either a Unix timestamp in milliseconds or UTC format. If not provided, then the + current time is used. + :param hs_email_direction: The direction the email was sent in. Possible values include: + + EMAIL: the email was sent from the CRM or sent and logged to the CRM with the BCC address. + INCOMING_EMAIL: the email was a reply to a logged outgoing email. + + FORWARDED_EMAIL: the email was forwarded to the CRM. + :param properties: Dictionary of properties as documented on hubspot + :return: + """ + if not hs_timestamp: + hs_timestamp = int(datetime.now().timestamp()) + else: + hs_timestamp = int(hs_timestamp.timestamp()) + + properties = dict( + hs_timestamp=hs_timestamp, + hs_email_direction=hs_email_direction, + **properties, + ) + response = self._create("email", properties) + return response + def delete_contact(self, value, property_name=None): try: public_gdpr_delete_input = PublicGdprDeleteInput( @@ -455,6 +560,20 @@ def delete_deal(self, deal_id): except ApiException as e: print(f"Exception when deleting deal: {e}\n") + def delete_ticket(self, ticket_id): + try: + api_response = self._client.crm.tickets.basic_api.archive(ticket_id) + return api_response + except ApiException as e: + print(f"Exception when deleting ticket: {e}\n") + + def delete_email(self, email_id): + try: + api_response = self._client.crm.objects.emails.basic_api.archive(email_id) + return api_response + except ApiException as e: + print(f"Exception when deleting email: {e}\n") + def update_company(self, object_id, **properties): response = self._update("company", object_id, properties) return response @@ -488,7 +607,7 @@ def create_association( from_object_id, to_object_type, to_object_id, - get_association_id(from_object_type, to_object_type), + [], ) return result diff --git a/requirements.txt b/requirements.txt index e9f6f99..be78bd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,9 @@ requests -isort==5.10.1 -flake8==4.0.1 -black==22.3.0 -pytest==6.2.5 -python-dotenv==0.19.2 -hubspot-api-client==5.0.1 +isort==5.12.0 +flake8==6.0.0 +black==23.1.0 +pytest==7.2.2 +python-dotenv==1.0.0 +hubspot-api-client==7.5.0 +tenacity==8.2.2 diff --git a/setup.py b/setup.py index 4bd2ece..3e230a1 100644 --- a/setup.py +++ b/setup.py @@ -8,6 +8,6 @@ description="Superscript Hubspot API", author="Superscript", author_email="paul.lucas@gosuperscript.com", - install_requires=["requests", "python-dotenv==0.19.2", "hubspot-api-client==5.0.1"], + install_requires=["requests", "python-dotenv==0.19.2", "hubspot-api-client==7.5.0"], packages=find_packages(include=["hs_api*"]), ) diff --git a/tests/api/test_integration_hubspot_api.py b/tests/api/test_integration_hubspot_api.py index af65e11..7a3e7fc 100644 --- a/tests/api/test_integration_hubspot_api.py +++ b/tests/api/test_integration_hubspot_api.py @@ -1,49 +1,13 @@ -import datetime -import time - import pytest +from tenacity import retry, stop_after_attempt, wait_fixed -from hs_api.api.hubspot_api import EMAIL_BATCH_LIMIT, BATCH_LIMITS, HubSpotClient +from hs_api.api.hubspot_api import BATCH_LIMITS, EMAIL_BATCH_LIMIT, HubSpotClient from hs_api.settings.settings import ( HUBSPOT_TEST_ACCESS_TOKEN, HUBSPOT_TEST_PIPELINE_ID, HUBSPOT_TEST_TICKET_PIPELINE_ID, ) -# Test Pipeline - -current_timestamp = datetime.datetime.now() -UNIQUE_ID = f"{current_timestamp:%Y%m%d%H%M%S%f}" - -TEST_COMPANY_NAME = f"{UNIQUE_ID} company" -TEST_EMAIL = f"{UNIQUE_ID}@email.com" -TEST_EMAIL_CUSTOM_DOMAIN = f"{UNIQUE_ID}@domain{UNIQUE_ID}.ai" -TEST_DEAL_NAME = f"{UNIQUE_ID} deal name" - - -def clear_down_test_objects(client): - companies = client.find_company("name", TEST_COMPANY_NAME) - for company in companies: - client.delete_company(company.id) - - deals = client.find_deal("dealname", TEST_DEAL_NAME) - for deal in deals: - client.delete_deal(deal.id) - - client.delete_contact(value=TEST_EMAIL, property_name="email") - client.delete_contact(value=TEST_EMAIL_CUSTOM_DOMAIN, property_name="email") - - -@pytest.fixture() -def hubspot_client(): - client = HubSpotClient( - access_token=HUBSPOT_TEST_ACCESS_TOKEN, pipeline_id=HUBSPOT_TEST_PIPELINE_ID - ) - try: - yield client - finally: - clear_down_test_objects(client) - def test_pipeline_id_none_raises_value_error(): with pytest.raises(ValueError): @@ -51,247 +15,304 @@ def test_pipeline_id_none_raises_value_error(): client.pipeline_stages -def test_create_and_find_contact(hubspot_client): - - test_first_name = f"{UNIQUE_ID} first name" - test_last_name = f"{UNIQUE_ID} last name" - test_phone = f"{UNIQUE_ID}" - +def test_create_and_find_contact( + hubspot_client, first_name, last_name, email, phone, company_name +): # Assert the contact doesn't already exist - contact = hubspot_client.find_contact("email", TEST_EMAIL) + contact = hubspot_client.find_contact("email", email) assert not contact # Create the contact contact_result = hubspot_client.create_contact( - email=TEST_EMAIL, - first_name=test_first_name, - last_name=test_last_name, - phone=test_phone, - company=TEST_COMPANY_NAME, + email=email, + first_name=first_name, + last_name=last_name, + phone=phone, + company=company_name, ) assert contact_result assert contact_result.id + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _get_contact(): + _contact = hubspot_client.find_contact("hs_object_id", contact_result.id) + assert _contact + # Assert the contact now exists based on previous creation - time.sleep(10) - contact = hubspot_client.find_contact("hs_object_id", contact_result.id) - assert contact + _get_contact() + + +def test_create_and_find_company(hubspot_client, company_name, domain): + # Assert the company doesn't already exist + company = hubspot_client.find_company("name", company_name) + assert not company + # Create the company + company_result = hubspot_client.create_company(name=company_name, domain=domain) + + assert company_result + assert company_result.id + + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _company = hubspot_client.find_company("hs_object_id", company_result.id) + assert _company + + # Assert the company now exists based on previous creation + _test() -def test_create_and_find_company(hubspot_client): - test_domain = f"{UNIQUE_ID}.test" +def test_create_and_find_company_iter(hubspot_client, domain, unique_id): + company_name = f"{unique_id} test_create_and_find_company_iter" # Assert the company doesn't already exist - company = hubspot_client.find_company("name", TEST_COMPANY_NAME) - assert not company + with pytest.raises(StopIteration): + next(hubspot_client.find_company_iter("name", company_name)) # Create the company - company_result = hubspot_client.create_company( - name=TEST_COMPANY_NAME, domain=test_domain - ) + company_result = hubspot_client.create_company(name=company_name, domain=domain) assert company_result assert company_result.id + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _company = next( + hubspot_client.find_company_iter("hs_object_id", company_result.id) + ) + assert _company + # Assert the company now exists based on previous creation - time.sleep(7) - company = hubspot_client.find_company("hs_object_id", company_result.id) - assert company + _test() + + +def test_create_and_find_ticket(hubspot_client: HubSpotClient, unique_id): + ticket_name = f"{unique_id}0 ticket name" + ticket = hubspot_client.find_all_tickets( + filter_name="subject", filter_value=ticket_name + ) + assert ticket + + +def test_create_and_find_email(hubspot_client: HubSpotClient): + email_resp = hubspot_client.find_all_email_events() + assert next(email_resp) def test_create_contact_and_associated_company_with_auto_created_company( hubspot_client, + first_name, + last_name, + email_custom_domain, + phone, + unique_id, ): - - test_first_name = f"{UNIQUE_ID} first name" - test_last_name = f"{UNIQUE_ID} last name" - test_phone = f"{UNIQUE_ID}" - + company_name = f"{unique_id} test_create_contact_and_associated_company_with_auto_created_company" # Assert the company and contact don't already exist - company = hubspot_client.find_company("name", TEST_COMPANY_NAME) + company = hubspot_client.find_company("name", company_name) assert not company - contact = hubspot_client.find_contact("email", TEST_EMAIL_CUSTOM_DOMAIN) + email_custom_domain = ( + f"{unique_id}@testcreatecontactandassociatedcompanywithautocreatedcompany.com" + ) + contact = hubspot_client.find_contact("email", email_custom_domain) assert not contact # Create the contact and company result = hubspot_client.create_contact_and_company( - email=TEST_EMAIL_CUSTOM_DOMAIN, - first_name=test_first_name, - last_name=test_last_name, - phone=test_phone, - company=TEST_COMPANY_NAME, + email=email_custom_domain, + first_name=first_name, + last_name=last_name, + phone=phone, + company=company_name, ) assert result assert result["contact"].id assert result["company"].id + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _company = hubspot_client.find_company("hs_object_id", result["company"].id) + assert _company + assert _company[0].properties["name"] == company_name + + _contact = hubspot_client.find_contact("email", email_custom_domain) + assert _contact + # Assert the company and contact now exists based on previous creation # and are linked - time.sleep(7) - company = hubspot_client.find_company("hs_object_id", result["company"].id) - assert company - assert company[0].properties["name"] == TEST_COMPANY_NAME - - contact = hubspot_client.find_contact("email", TEST_EMAIL_CUSTOM_DOMAIN) - assert contact + _test() def test_create_contact_and_associated_company_without_auto_created_company( - hubspot_client, + hubspot_client, first_name, last_name, phone, unique_id ): - - test_first_name = f"{UNIQUE_ID} first name" - test_last_name = f"{UNIQUE_ID} last name" - test_phone = f"{UNIQUE_ID}" - # Assert the company and contact don't already exist - company = hubspot_client.find_company("name", TEST_COMPANY_NAME) + company_name = f"{unique_id} test_create_contact_and_associated_company_without_auto_created_company" + company = hubspot_client.find_company("name", company_name) assert not company - contact = hubspot_client.find_contact("email", TEST_EMAIL) + email = f"{unique_id}@testcreatecontactandassociatedcompanywithoutautocreatedcompany.com" + contact = hubspot_client.find_contact("email", email) assert not contact # Create the contact and company result = hubspot_client.create_contact_and_company( - email=TEST_EMAIL, - first_name=test_first_name, - last_name=test_last_name, - phone=test_phone, - company=TEST_COMPANY_NAME, + email=email, + first_name=first_name, + last_name=last_name, + phone=phone, + company=company_name, ) assert result assert result["contact"].id assert result["company"].id - # Assert the company and contact now exists based on previous creation - # and are linked - time.sleep(7) - company = hubspot_client.find_company("hs_object_id", result["company"].id) - assert company - assert company[0].properties["name"] == TEST_COMPANY_NAME + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _company = hubspot_client.find_company("hs_object_id", result["company"].id) + assert _company + assert _company[0].properties["name"] == company_name - contact = hubspot_client.find_contact("email", TEST_EMAIL) - assert contact + _contact = hubspot_client.find_contact("email", email) + assert _contact - association = hubspot_client.contact_associations(result["contact"].id, "company") - assert association - assert association[0].id == result["company"].id + _association = hubspot_client.contact_associations( + result["contact"].id, "company" + ) + assert _association + assert _association[0].to_object_id == int(result["company"].id) + # Assert the company and contact now exists based on previous creation + # and are linked + _test() -def test_create_and_find_deal(hubspot_client): +def test_create_and_find_deal(hubspot_client, unique_id): test_amount = 99.99 # Assert the deal doesn't already exist - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) + deal_name = f"{unique_id} test_create_and_find_deal" + deal = hubspot_client.find_deal("dealname", deal_name) assert not deal # Create the deal deal_result = hubspot_client.create_deal( - name=TEST_DEAL_NAME, + name=deal_name, amount=test_amount, ) assert deal_result assert deal_result.id - # Assert the deal now exists based on previous creation - time.sleep(7) - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) - assert deal + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _deal = hubspot_client.find_deal("dealname", deal_name) + assert _deal + # Assert the deal now exists based on previous creation + _test() -def test_create_deal_for_company(hubspot_client): +def test_create_deal_for_company(hubspot_client, unique_id): test_amount = 99.99 # Assert the deal and company don't already exist - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) + deal_name = f"{unique_id} test_create_deal_for_company" + deal = hubspot_client.find_deal("dealname", deal_name) assert not deal - company = hubspot_client.find_company("name", TEST_COMPANY_NAME) + company_name = f"{unique_id} test_create_deal_for_company" + company = hubspot_client.find_company("name", company_name) assert not company # Create the company - company_result = hubspot_client.create_company(name=TEST_COMPANY_NAME) + company_result = hubspot_client.create_company(name=company_name) # Create the deal deal_result = hubspot_client.create_deal( - name=TEST_DEAL_NAME, amount=test_amount, company_id=company_result.id + name=deal_name, amount=test_amount, company_id=company_result.id ) assert deal_result assert deal_result.id - # Assert the company and deal now exists based on previous creation - # and are linked - time.sleep(7) - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) - assert deal + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _deal = hubspot_client.find_deal("dealname", deal_name) + assert _deal - company = hubspot_client.find_company("hs_object_id", company_result.id) - assert company + _company = hubspot_client.find_company("hs_object_id", company_result.id) + assert _company - association = hubspot_client.deal_associations(deal_result.id, "company") - assert association - assert association[0].id == company_result.id + _association = hubspot_client.deal_associations(deal_result.id, "company") + assert _association + assert _association[0].to_object_id == int(company_result.id) + # Assert the company and deal now exists based on previous creation + # and are linked + _test() -def test_create_deal_for_contact(hubspot_client): - test_first_name = f"{UNIQUE_ID} first name" - test_last_name = f"{UNIQUE_ID} last name" - test_phone = f"{UNIQUE_ID}" +def test_create_deal_for_contact( + hubspot_client, first_name, last_name, phone, company_name, unique_id +): test_amount = 99.99 # Assert the deal and company don't already exist - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) + deal_name = f"{unique_id} test_create_deal_for_contact" + deal = hubspot_client.find_deal("dealname", deal_name) assert not deal - contact = hubspot_client.find_contact("email", TEST_EMAIL) + email = f"{unique_id}@testcreatedealforcontact.com" + contact = hubspot_client.find_contact("email", email) assert not contact # Create the contact contact_result = hubspot_client.create_contact( - email=TEST_EMAIL, - first_name=test_first_name, - last_name=test_last_name, - phone=test_phone, - company=TEST_COMPANY_NAME, + email=email, + first_name=first_name, + last_name=last_name, + phone=phone, + company=company_name, ) # Create the deal deal_result = hubspot_client.create_deal( - name=TEST_DEAL_NAME, amount=test_amount, contact_id=contact_result.id + name=deal_name, amount=test_amount, contact_id=contact_result.id ) assert deal_result assert deal_result.id - # Assert the company and deal now exists based on previous creation - # and are linked - time.sleep(7) - deal = hubspot_client.find_deal("dealname", TEST_DEAL_NAME) - assert deal + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + _deal = hubspot_client.find_deal("dealname", deal_name) + assert _deal - company = hubspot_client.find_contact("hs_object_id", contact_result.id) - assert company + _company = hubspot_client.find_contact("hs_object_id", contact_result.id) + assert _company - association = hubspot_client.deal_associations(deal_result.id, "contact") - assert association - assert association[0].id == contact_result.id + _association = hubspot_client.deal_associations(deal_result.id, "contact") + assert _association + assert _association[0].to_object_id == int(contact_result.id) + + # Assert the company and deal now exists based on previous creation + # and are linked + _test() def test_find_owner_by_email(hubspot_client): - # This test relies on owner_id "49185288" existing in the testing environment. - owner = hubspot_client.find_owner("email", "lovely-whole.abcaebiz@mailosaur.io") + owners = next(hubspot_client.find_all_owners()) + oid = owners.results[0].id + oemail = owners.results[0].email + + owner = hubspot_client.find_owner("email", oemail) assert owner - assert owner.id == "49185288" + assert owner.id == oid def test_find_owner_not_found_returns_none(hubspot_client): @@ -300,10 +321,13 @@ def test_find_owner_not_found_returns_none(hubspot_client): def test_find_owner_by_id(hubspot_client): - # This test relies on owner_id "49185288" existing in the testing environment. - owner = hubspot_client.find_owner("id", "49185288") + owners = next(hubspot_client.find_all_owners()) + oid = owners.results[0].id + oemail = owners.results[0].email + + owner = hubspot_client.find_owner("id", oid) assert owner - assert owner.email == "lovely-whole.abcaebiz@mailosaur.io" + assert owner.email == oemail def test_find_owner_without_id_or_email(hubspot_client): @@ -311,21 +335,25 @@ def test_find_owner_without_id_or_email(hubspot_client): hubspot_client.find_owner("some_id", "some_value") -def test_find_all_tickets_returns_batches(hubspot_client): - tickets = hubspot_client.find_all_tickets() +def test_find_all_tickets_returns_batches(hubspot_client: HubSpotClient): + @retry(stop=stop_after_attempt(7), wait=wait_fixed(2)) + def _test(): + tickets = hubspot_client.find_all_tickets() - # Assert that the first batch contains the limit of records - # for a batch - initial_batch = next(tickets) - assert len(initial_batch) == BATCH_LIMITS + # Assert that the first batch contains the limit of records + # for a batch + initial_batch = next(tickets) + assert len(initial_batch) == BATCH_LIMITS - following_batch = next(tickets) + following_batch = next(tickets) - # Assert that the next batch follows on from the previous - assert following_batch[0].updated_at > initial_batch[-1].updated_at + # Assert that the next batch follows on from the previous + assert following_batch[0].updated_at > initial_batch[-1].updated_at + _test() -def test_find_all_tickets_returns_default_properties(hubspot_client): + +def test_find_all_tickets_returns_default_properties(hubspot_client: HubSpotClient): tickets = hubspot_client.find_all_tickets() actual = next(tickets)[0].properties expected = { @@ -344,7 +372,7 @@ def test_find_all_tickets_returns_default_properties(hubspot_client): assert actual.keys() == expected.keys() -def test_find_all_tickets_returns_given_properties(hubspot_client): +def test_find_all_tickets_returns_given_properties(hubspot_client: HubSpotClient): tickets = hubspot_client.find_all_tickets( properties=["hs_lastmodifieddate", "hs_object_id"] ) @@ -360,7 +388,9 @@ def test_find_all_tickets_returns_given_properties(hubspot_client): assert actual.keys() == expected.keys() -def test_find_all_tickets_returns_after_given_hs_lastmodifieddate(hubspot_client): +def test_find_all_tickets_returns_after_given_hs_lastmodifieddate( + hubspot_client: HubSpotClient, +): all_tickets = hubspot_client.find_all_tickets() filter_value = next(all_tickets)[0].updated_at filtered_tickets = hubspot_client.find_all_tickets( @@ -373,7 +403,9 @@ def test_find_all_tickets_returns_after_given_hs_lastmodifieddate(hubspot_client assert next(filtered_tickets)[0].updated_at > filter_value -def test_find_all_tickets_returns_after_given_hs_object_id(hubspot_client): +def test_find_all_tickets_returns_after_given_hs_object_id( + hubspot_client: HubSpotClient, +): all_tickets = hubspot_client.find_all_tickets() filter_value = next(all_tickets)[0].id filtered_tickets = hubspot_client.find_all_tickets( @@ -386,7 +418,7 @@ def test_find_all_tickets_returns_after_given_hs_object_id(hubspot_client): assert next(filtered_tickets)[0].id > filter_value -def test_find_all_tickets_returns_for_given_pipeline_id(hubspot_client): +def test_find_all_tickets_returns_for_given_pipeline_id(hubspot_client: HubSpotClient): all_tickets = hubspot_client.find_all_tickets( pipeline_id=HUBSPOT_TEST_TICKET_PIPELINE_ID ) @@ -515,11 +547,9 @@ def test_find_all_email_events_returns_batches(hubspot_client): def test_find_all_email_events_returns_after_given_starttimestamp_epoch(hubspot_client): all_events = hubspot_client.find_all_email_events() - filter_value = next(all_events)[-1]["created"] - filtered_events = hubspot_client.find_all_email_events( - filter_name="startTimestamp", - filter_value=filter_value + 1, - ) + filter_value = next(all_events)[-1].created_at + parameters = dict(created_after=filter_value.timestamp() + 1) + filtered_events = hubspot_client.find_all_email_events(**parameters) # Assert that the first record of the returned filtered list starts # after the original returned list - assert next(filtered_events)[0]["created"] > filter_value + assert next(filtered_events)[0].created_at > filter_value diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..86bd204 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,129 @@ +import datetime + +import pytest + +from hs_api.api.hubspot_api import BATCH_LIMITS, EMAIL_BATCH_LIMIT, HubSpotClient +from hs_api.settings.settings import ( + HUBSPOT_TEST_ACCESS_TOKEN, + HUBSPOT_TEST_PIPELINE_ID, + HUBSPOT_TEST_TICKET_PIPELINE_ID, +) + + +@pytest.fixture(scope="session") +def hubspot_client(deal_name, company_name, email, email_custom_domain, unique_id): + client = HubSpotClient( + access_token=HUBSPOT_TEST_ACCESS_TOKEN, pipeline_id=HUBSPOT_TEST_PIPELINE_ID + ) + test_deal = None + test_tickets = None + test_emails = [] + + try: + # create Hubspot elements + test_deal = client.create_deal(deal_name) + test_tickets = create_tickets(client, unique_id, BATCH_LIMITS + 1) + test_emails = create_emails(client, EMAIL_BATCH_LIMIT + 1) + + yield client + + finally: + # clean up all created elements + if test_deal: + client.delete_deal(test_deal.id) + + if test_tickets: + for tid in test_tickets: + hubspot_client.delete_ticket(tid) + + for tid in test_emails: + client.delete_email(tid) + + clear_down_test_objects( + client, company_name, deal_name, email, email_custom_domain + ) + + +def create_emails(client, quantity): + test_emails = [] + for _ in range(quantity): + test_emails.append(client.create_email().id) + return test_emails + + +def create_tickets(client, unique_id, quantity): + ticket_ids = [] + for i in range(quantity): + test_ticket_name = f"{unique_id}{i} ticket name" + properties = dict( + subject=test_ticket_name, + hs_pipeline=HUBSPOT_TEST_TICKET_PIPELINE_ID, + hs_pipeline_stage=1, + hs_ticket_priority="HIGH", + ) + + ticket_result = client.create_ticket(**properties) + assert ticket_result + assert ticket_result.id + return ticket_ids + + +def clear_down_test_objects( + client, company_name, deal_name, email, email_custom_domain +): + companies = client.find_company("name", company_name) + for company in companies: + client.delete_company(company.id) + + deals = client.find_deal("dealname", deal_name) + for deal in deals: + client.delete_deal(deal.id) + + client.delete_contact(value=email, property_name="email") + client.delete_contact(value=email_custom_domain, property_name="email") + + +@pytest.fixture(scope="session") +def unique_id(): + current_timestamp = datetime.datetime.now() + return f"{current_timestamp:%Y%m%d%H%M%S%f}" + + +@pytest.fixture(scope="session") +def company_name(unique_id): + return f"{unique_id} company" + + +@pytest.fixture(scope="session") +def email(unique_id): + return f"{unique_id}@email.com" + + +@pytest.fixture(scope="session") +def email_custom_domain(unique_id): + return f"{unique_id}@domain{unique_id}.ai" + + +@pytest.fixture(scope="session") +def domain(unique_id): + return f"{unique_id}.test" + + +@pytest.fixture(scope="session") +def deal_name(unique_id): + return f"{unique_id} deal name" + + +@pytest.fixture(scope="session") +def first_name(unique_id): + return f"{unique_id} first name" + + +@pytest.fixture(scope="session") +def last_name(unique_id): + return f"{unique_id} last name" + + +@pytest.fixture(scope="session") +def phone(unique_id): + return f"{unique_id}"