Skip to content

Commit

Permalink
chore: add warning for access expiration date of 1 week (#201)
Browse files Browse the repository at this point in the history
* chore: add warning for access expiration date of 1 week

* chore: use error

* chore: duration 57

* chore: use critical

* chore: use airbyte traced exception

* chore: move check to source level

* chore: move check to source level

* chore: move check to source level

* chore: remove unused imports
  • Loading branch information
am6010 authored Aug 23, 2024
1 parent 4178f8f commit 5a442aa
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from dataclasses import dataclass
from time import sleep
from typing import Optional

import backoff
import pendulum
Expand Down Expand Up @@ -193,6 +194,17 @@ def call(
self._handle_call_rate_limit(response, params)
return response

def get_access_token_expiration(self) -> Optional[int]:
"""Get access token expiration time"""
try:
response = self.call("GET", "https://graph.facebook.com/debug_token", params={"input_token": MyFacebookAdsApi._access_token})
response_body = response.json()
return response_body.get("data", {}).get("expires_at", None)
except FacebookRequestError:
return None
except requests.exceptions.ConnectionError:
return None


class API:
"""Simple wrapper around Facebook API"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
#

import logging
from typing import Any, List, Mapping, Optional, Tuple, Type
from typing import Any, List, Mapping, Optional, Tuple, Type, Union, MutableMapping, Iterator

import facebook_business
import pendulum
import requests
from airbyte_cdk.models import (
AdvancedAuth,
AirbyteStateMessage,
AuthFlowType,
ConnectorSpecification,
DestinationSyncMode,
FailureType,
OAuthConfigSpecification,
ConfiguredAirbyteCatalog,
AirbyteMessage,
)
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -272,3 +275,28 @@ def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List
)
streams.append(stream)
return streams

def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None,
) -> Iterator[AirbyteMessage]:
# Read records from the source and emit messages
for message in super().read(logger, config, catalog, state):
yield message

# Check if the access token is about to expire
# If it is, raise an exception to notify the user
config = self._validate_and_transform(config)
api = API(account_id=config.account_id, access_token=config.access_token, app_secret=config.client_secret)
expires_at = api.api.get_access_token_expiration()
if expires_at and pendulum.from_timestamp(expires_at) - pendulum.now() < pendulum.duration(days=7):
raise AirbyteTracedException(
message="Access token is about to expire, please re-authenticate",
internal_message="Access token is about to expire, please re-authenticate",
failure_type=FailureType.config_error,
)


0 comments on commit 5a442aa

Please sign in to comment.