Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reset session in every sleep #160

Merged
merged 8 commits into from
Sep 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.streams.common import retry_pattern
from source_facebook_marketing.streams.common import retry_pattern, FACEBOOK_CONNECTION_RESET_ERROR_CODE

logger = logging.getLogger("airbyte")

Expand All @@ -25,7 +25,7 @@ class FacebookAPIException(Exception):
"""General class for all API errors"""


backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=120)


class MyFacebookAdsApi(FacebookAdsApi):
Expand All @@ -43,6 +43,7 @@ class Throttle:

# Insights async jobs throttle
_ads_insights_throttle: Throttle
_access_token = None

@property
def ads_insights_throttle(self) -> Throttle:
Expand Down Expand Up @@ -88,6 +89,16 @@ def _parse_call_rate_header(headers):

return usage, pause_interval

@classmethod
def set_access_token(cls, access_token):
cls._access_token = access_token

@classmethod
def reset_session(cls):
logger.info("resetting session after sleep")
api = MyFacebookAdsApi.init(access_token=MyFacebookAdsApi._access_token, crash_log=False)
FacebookAdsApi.set_default_api(api)

def _compute_pause_interval(self, usage, pause_interval):
"""The sleep time will be calculated based on usage consumed."""
if usage >= self.MAX_RATE:
Expand Down Expand Up @@ -123,6 +134,7 @@ def _handle_call_rate_limit(self, response, params):
sleep_time = self._compute_pause_interval(usage=usage, pause_interval=pause_interval)
logger.warning(f"Utilization is too high ({usage})%, pausing for {sleep_time}")
sleep(sleep_time.total_seconds())
MyFacebookAdsApi.reset_session()

def _update_insights_throttle_limit(self, response: FacebookResponse):
"""
Expand Down Expand Up @@ -151,7 +163,21 @@ def call(
api_version=None,
):
"""Makes an API call, delegate actual work to parent class and handles call rates"""
response = super().call(method, path, params, headers, files, url_override, api_version)
try:
response = super().call(method, path, params, headers, files, url_override, api_version)
except FacebookRequestError as exc:
MyFacebookAdsApi.reset_session()
raise exc
except ConnectionResetError as exc:
MyFacebookAdsApi.reset_session()
body = {
"error": {
"code": FACEBOOK_CONNECTION_RESET_ERROR_CODE,
"is_transient": "true",
}
}
raise FacebookRequestError(str(exc), {}, 400, None, json.dumps(body))

self._update_insights_throttle_limit(response)
self._handle_call_rate_limit(response, params)
return response
Expand All @@ -164,6 +190,7 @@ def __init__(self, account_id: str, access_token: str):
self._account_id = account_id
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False)
MyFacebookAdsApi.set_access_token(access_token)
FacebookAdsApi.set_default_api(self.api)

@cached_property
Expand Down