| @abc.abstractmethod
-def get_access_token(self) -> str:
- pass
+ | @abc.abstractmethod
+def get_access_token(self) -> str:
+ pass
|
@@ -1096,7 +1161,7 @@
-get_airflow_api_url() -> str
+ get_airflow_api_url() -> str
abstractmethod
@@ -1109,11 +1174,11 @@
Source code in brickflow_plugins/airflow/operators/external_tasks.py
- | @abc.abstractmethod
-def get_airflow_api_url(self) -> str:
- pass
+ | @abc.abstractmethod
+def get_airflow_api_url(self) -> str:
+ pass
|
@@ -1126,7 +1191,7 @@
-get_version() -> str
+ get_version() -> str
abstractmethod
@@ -1139,11 +1204,11 @@
Source code in brickflow_plugins/airflow/operators/external_tasks.py
- | @abc.abstractmethod
-def get_version(self) -> str:
- pass
+ | @abc.abstractmethod
+def get_version(self) -> str:
+ pass
|
@@ -1156,6 +1221,7 @@
@@ -1163,7 +1229,7 @@
-brickflow_plugins.airflow.operators.external_tasks.AirflowProxyOktaClusterAuth(oauth2_conn_id: str, airflow_cluster_url: str, airflow_version: str = None, get_airflow_version_callback: Callable[[str, str], str] = None)
+ brickflow_plugins.airflow.operators.external_tasks.AirflowProxyOktaClusterAuth(oauth2_conn_id: str, airflow_cluster_url: str, airflow_version: str = None, get_airflow_version_callback: Callable[[str, str], str] = None)
@@ -1173,10 +1239,9 @@ AirflowClusterAuth
-
- Source code in brickflow_plugins/airflow/operators/external_tasks.py
- 54
-55
+
+ Source code in brickflow_plugins/airflow/operators/external_tasks.py
+ | def __init__(
- self,
- oauth2_conn_id: str,
- airflow_cluster_url: str,
- airflow_version: str = None,
- get_airflow_version_callback: Callable[[str, str], str] = None,
-):
- self._airflow_version = airflow_version
- self._get_airflow_version_callback = get_airflow_version_callback
- self._oauth2_conn_id = oauth2_conn_id
- self._airflow_url = airflow_cluster_url.rstrip("/")
- if airflow_version is None and get_airflow_version_callback is None:
- raise Exception(
- "Either airflow_version or get_airflow_version_callback must be provided"
- )
+68
+69
| def __init__(
+ self,
+ oauth2_conn_id: str,
+ airflow_cluster_url: str,
+ airflow_version: str = None,
+ get_airflow_version_callback: Callable[[str, str], str] = None,
+):
+ self._airflow_version = airflow_version
+ self._get_airflow_version_callback = get_airflow_version_callback
+ self._oauth2_conn_id = oauth2_conn_id
+ self._airflow_url = airflow_cluster_url.rstrip("/")
+ if airflow_version is None and get_airflow_version_callback is None:
+ raise Exception(
+ "Either airflow_version or get_airflow_version_callback must be provided"
+ )
|
-
+
@@ -1225,7 +1291,7 @@
-get_access_token() -> str
+ get_access_token() -> str
@@ -1234,8 +1300,7 @@
Source code in brickflow_plugins/airflow/operators/external_tasks.py
- 85
- 86
+ | def get_access_token(self) -> str:
- okta_url = self.get_okta_url()
- client_id = self.get_okta_client_id()
- client_secret = self.get_okta_client_secret()
-
- payload = (
- "client_id="
- + client_id
- + "&client_secret="
- + client_secret
- + "&grant_type=client_credentials"
- )
- headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "cache-control": "no-cache",
- }
- response = requests.post(okta_url, data=payload, headers=headers, timeout=600)
- if (
- response.status_code < HTTPStatus.OK
- or response.status_code > HTTPStatus.PARTIAL_CONTENT
- ):
- log.error(
- "Failed request to Okta for JWT status_code={} response={} client_id={}".format(
- response.status_code, response.text, client_id
- )
- )
- token_data = response.json()["access_token"]
- return token_data
+112
+113
| def get_access_token(self) -> str:
+ okta_url = self.get_okta_url()
+ client_id = self.get_okta_client_id()
+ client_secret = self.get_okta_client_secret()
+
+ payload = (
+ "client_id="
+ + client_id
+ + "&client_secret="
+ + client_secret
+ + "&grant_type=client_credentials"
+ )
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded",
+ "cache-control": "no-cache",
+ }
+ response = requests.post(okta_url, data=payload, headers=headers, timeout=600)
+ if (
+ response.status_code < HTTPStatus.OK
+ or response.status_code > HTTPStatus.PARTIAL_CONTENT
+ ):
+ log.error(
+ "Failed request to Okta for JWT status_code={} response={} client_id={}".format(
+ response.status_code, response.text, client_id
+ )
+ )
+ token_data = response.json()["access_token"]
+ return token_data
|
@@ -1301,7 +1367,7 @@
-get_airflow_api_url() -> str
+ get_airflow_api_url() -> str
@@ -1310,11 +1376,11 @@
Source code in brickflow_plugins/airflow/operators/external_tasks.py
- | def get_airflow_api_url(self) -> str:
- # TODO: templatize this to a env variable
- return self._airflow_url
+ | def get_airflow_api_url(self) -> str:
+ # TODO: templatize this to a env variable
+ return self._airflow_url
|
@@ -1327,7 +1393,7 @@
-get_okta_client_id() -> str
+ get_okta_client_id() -> str
@@ -1336,9 +1402,9 @@
Source code in brickflow_plugins/airflow/operators/external_tasks.py
- | def get_okta_client_id(self) -> str:
- return self.get_okta_conn().login
+ | def get_okta_client_id(self) -> str:
+ return self.get_okta_conn().login
|
@@ -1351,7 +1417,7 @@
-get_okta_client_secret() -> str
+ get_okta_client_secret() -> str
@@ -1360,9 +1426,9 @@ |
|
|
|
|
|
|