Skip to content

Commit

Permalink
Merge branch 'main' of ssh://github.com/rudderlabs/airbyte into chore…
Browse files Browse the repository at this point in the history
…-tempZendeskAuthChange
  • Loading branch information
am6010 committed Oct 4, 2023
2 parents dd4e92c + 5e099a8 commit 466603a
Show file tree
Hide file tree
Showing 25 changed files with 346 additions and 434 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9.11-alpine3.15 as base
FROM python:3.12.0a1-alpine3.15 as base

# build and load all requirements
FROM base as builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.36",
"cached_property==1.5.2",
"facebook_business==16.0.0",
"facebook_business==17.0.0",
"pendulum>=2,<3",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@

import backoff
import pendulum
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils import AirbyteTracedException
from cached_property import cached_property
from facebook_business import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams.common import retry_pattern
from source_facebook_marketing.streams.common import retry_pattern, FACEBOOK_CONNECTION_RESET_ERROR_CODE

logger = logging.getLogger("airbyte")

Expand All @@ -25,7 +26,7 @@ class FacebookAPIException(Exception):
"""General class for all API errors"""


backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=120)


class MyFacebookAdsApi(FacebookAdsApi):
Expand All @@ -43,6 +44,7 @@ class Throttle:

# Insights async jobs throttle
_ads_insights_throttle: Throttle
_access_token = None

@property
def ads_insights_throttle(self) -> Throttle:
Expand Down Expand Up @@ -88,6 +90,16 @@ def _parse_call_rate_header(headers):

return usage, pause_interval

@classmethod
def set_access_token(cls, access_token):
cls._access_token = access_token

@classmethod
def reset_session(cls):
logger.info("resetting session after sleep")
api = MyFacebookAdsApi.init(access_token=MyFacebookAdsApi._access_token, crash_log=False)
FacebookAdsApi.set_default_api(api)

def _compute_pause_interval(self, usage, pause_interval):
"""The sleep time will be calculated based on usage consumed."""
if usage >= self.MAX_RATE:
Expand Down Expand Up @@ -123,6 +135,7 @@ def _handle_call_rate_limit(self, response, params):
sleep_time = self._compute_pause_interval(usage=usage, pause_interval=pause_interval)
logger.warning(f"Utilization is too high ({usage})%, pausing for {sleep_time}")
sleep(sleep_time.total_seconds())
MyFacebookAdsApi.reset_session()

def _update_insights_throttle_limit(self, response: FacebookResponse):
"""
Expand Down Expand Up @@ -151,7 +164,21 @@ def call(
api_version=None,
):
"""Makes an API call, delegate actual work to parent class and handles call rates"""
response = super().call(method, path, params, headers, files, url_override, api_version)
try:
response = super().call(method, path, params, headers, files, url_override, api_version)
except FacebookRequestError as exc:
MyFacebookAdsApi.reset_session()
raise exc
except requests.exceptions.ConnectionError as exc:
MyFacebookAdsApi.reset_session()
body = {
"error": {
"code": FACEBOOK_CONNECTION_RESET_ERROR_CODE,
"is_transient": "true",
}
}
raise FacebookRequestError(str(exc), {}, 400, None, json.dumps(body))

self._update_insights_throttle_limit(response)
self._handle_call_rate_limit(response, params)
return response
Expand All @@ -164,6 +191,7 @@ def __init__(self, account_id: str, access_token: str):
self._account_id = account_id
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False)
MyFacebookAdsApi.set_access_token(access_token)
FacebookAdsApi.set_default_api(self.api)

@cached_property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@
}
}
},
"has_advertiser_opted_in_odax": {
"type": ["null", "boolean"]
},
"has_migrated_permissions": {
"type": ["null", "boolean"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
api = API(account_id=config.account_id, access_token=config.access_token)
logger.info(f"Select account {api.account}")
except (requests.exceptions.RequestException, ValidationError, FacebookAPIException) as e:
return False, e
return False, f"error: {repr(e)}"
except AirbyteTracedException as traced_exc:
return False, traced_exc.message

# make sure that we have valid combination of "action_breakdowns" and "breakdowns" parameters
for stream in self.get_custom_insights_streams(api, config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def schema_extra(schema: Dict[str, Any], model: Type["ConnectorConfig"]) -> None
page_size: Optional[PositiveInt] = Field(
title="Page Size of Requests",
order=7,
default=100,
default=500,
)

insights_lookback_window: Optional[PositiveInt] = Field(
Expand All @@ -154,6 +154,7 @@ def schema_extra(schema: Dict[str, Any], model: Type["ConnectorConfig"]) -> None
title="Maximum size of Batched Requests",
order=9,
default=50,
maximum=50,
)

action_breakdowns_allow_empty: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 100, max_batch_size: int = 50, **kwargs):
super().__init__(**kwargs)
self._api = api
self.page_size = page_size if page_size is not None else 100
self.page_size = page_size if page_size is not None else 500
self._include_deleted = include_deleted if self.enable_deleted else False
self.max_batch_size = max_batch_size if max_batch_size is not None else 50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class AdCreatives(FBMarketingStream):

entity_prefix = "adcreative"
enable_deleted = False
use_batch = False

def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
super().__init__(**kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,16 +1346,15 @@ def request_params(


class DealPipelines(ClientSideIncrementalStream):
"""Deal pipelines, API v1,
This endpoint requires the contacts scope the tickets scope.
Docs: https://legacydocs.hubspot.com/docs/methods/pipelines/get_pipelines_for_object_type
"""Deal pipelines, API v3,
Docs: https://developers.hubspot.com/docs/api/crm/pipelines
"""

url = "/crm/v3/pipelines/deals"
updated_at_field = "updatedAt"
created_at_field = "createdAt"
cursor_field_datetime_format = "x"
primary_key = "pipelineId"
primary_key = "id"
scopes = {
"crm.schemas.contacts.read",
"crm.objects.contacts.read",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_intercom ./source_intercom
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-intercom
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_intercom/spec.json"
# Spec fix: advanced auth configuration contain `client_id` and `client_secret` fields but they were missing in spec.
backward_compatibility_tests_config:
disable_for_version: "0.2.1"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -15,12 +18,14 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
# Schema fix: update schemas with undeclared fields which is not breaking change
backward_compatibility_tests_config:
disable_for_version: "0.2.1"
basic_read:
tests:
- config_path: "secrets/config.json"
expect_records:
path: "integration_tests/expected_records.jsonl"
fail_on_extra_columns: false
incremental:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
dockerImageTag: 0.2.1
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-intercom
githubIssueLabel: source-intercom
icon: intercom.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-intercom/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk==0.29.0",
"airbyte-cdk",
]

TEST_REQUIREMENTS = [
Expand Down
Loading

0 comments on commit 466603a

Please sign in to comment.