diff --git a/fivetran_provider_async/example_dags/example_fivetran_without_connector_id.py b/fivetran_provider_async/example_dags/example_fivetran_without_connector_id.py new file mode 100644 index 0000000..b28b1fa --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran_without_connector_id.py @@ -0,0 +1,25 @@ +from datetime import datetime, timedelta + +from airflow import DAG + +from fivetran_provider_async.operators import FivetranOperator + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), +} + +dag = DAG( + dag_id="example_fivetran_without_connector_id", + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) + +with dag: + fivetran_sync_start = FivetranOperator( + task_id="fivetran_task", + fivetran_conn_id="fivetran_default", + connector_name="{{ var.value.connector_name }}", + destination_name="{{ var.value.destination_name }}", + ) diff --git a/fivetran_provider_async/hooks.py b/fivetran_provider_async/hooks.py index d5a15b7..cacf279 100644 --- a/fivetran_provider_async/hooks.py +++ b/fivetran_provider_async/hooks.py @@ -298,6 +298,21 @@ def get_connectors(self, group_id: str) -> Iterator[dict]: for connector in resp["data"]["items"]: yield connector + def get_connector_id(self, connector_name: str, destination_name: str) -> str: + all_groups = self._get_groups() + group = next((group for group in all_groups if group.get("name") == destination_name), None) + if not group: + raise ValueError(f"Destination '{destination_name}' not found.") + + all_connectors = self.get_connectors(group_id=group.get("id", "")) + connector = next( + (connector for connector in all_connectors if connector.get("schema") == connector_name), None + ) + if not connector: + raise ValueError(f"Connector '{connector_name}' not found in Destination '{destination_name}'.") + + return connector.get("id", "") + def check_connector(self, connector_id: str) -> dict[str, Any]: """ Ensures connector configuration has been completed successfully and is in diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 7003415..7a419c8 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -45,7 +45,11 @@ class FivetranOperator(BaseOperator): :param fivetran_retry_delay: Time to wait before retrying API request :param run_name: Fivetran run name :param timeout_seconds: Timeout in seconds - :param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page. + :param connector_id: Optional, ID of the Fivetran connector to sync, found on the Connector settings page. + :param connector_name: Optional, Name of the Fivetran connector to sync, found on the + Connectors page in the Fivetran Dashboard. + :param destination_name: Optional, Destination of the Fivetran connector to sync, found on the + Connectors page in the Fivetran Dashboard. :param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule. :param poll_frequency: Time in seconds that the job should wait in between each try. :param reschedule_wait_time: Optional, if connector is in reset state, @@ -56,11 +60,13 @@ class FivetranOperator(BaseOperator): operator_extra_links = (RegistryLink(),) - template_fields = ["connector_id"] + template_fields = ["connector_id", "connector_name", "destination_name"] def __init__( self, - connector_id: str, + connector_id: Optional[str] = None, + connector_name: Optional[str] = None, + destination_name: Optional[str] = None, run_name: Optional[str] = None, fivetran_conn_id: str = "fivetran_default", fivetran_retry_limit: int = 3, @@ -72,7 +78,9 @@ def __init__( wait_for_completion: bool = True, **kwargs, ) -> None: - self.connector_id = connector_id + self._connector_id = connector_id + self.connector_name = connector_name + self.destination_name = destination_name self.fivetran_conn_id = fivetran_conn_id self.run_name = run_name @@ -158,6 +166,17 @@ def hook(self) -> FivetranHook: retry_delay=self.fivetran_retry_delay, ) + @cached_property + def connector_id(self) -> str: + if self._connector_id: + return self._connector_id + elif self.connector_name and self.destination_name: + return self.hook.get_connector_id( + connector_name=self.connector_name, destination_name=self.destination_name + ) + + raise ValueError("No value specified for connector_id or to both connector_name and destination_name") + def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None: """ Callback for when the trigger fires - returns immediately. diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index 0f268c6..707bbb0 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -2,7 +2,7 @@ from datetime import datetime from functools import cached_property -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional from airflow.exceptions import AirflowException from airflow.sensors.base import BaseSensorOperator @@ -37,8 +37,9 @@ class FivetranSensor(BaseSensorOperator): `completed_after_time` is `2020-01-01 03:00:00`, then the sensor will stop waiting at `03:05:00`. - `FivetranSensor` requires that you specify the `connector_id` of the sync - job to start. You can find `connector_id` in the Settings page of the + `FivetranSensor` requires that you specify either the `connector_id` or + both `connector_name` and `destination_name` of the sync job to start. + You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. @@ -47,8 +48,12 @@ class FivetranSensor(BaseSensorOperator): :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. - :param connector_id: ID of the Fivetran connector to sync, found on the + :param connector_id: Optional. ID of the Fivetran connector to sync, found on the Connector settings page in the Fivetran Dashboard. + :param connector_name: Optional. Name of the Fivetran connector to sync, found on the + Connectors page in the Fivetran Dashboard. + :param destination_name: Optional. Destination of the Fivetran connector to sync, found on the + Connectors page in the Fivetran Dashboard. :param poke_interval: Time in seconds that the job should wait in between each try :param fivetran_retry_limit: # of retries when encountering API errors @@ -79,7 +84,9 @@ class FivetranSensor(BaseSensorOperator): def __init__( self, - connector_id: str, + connector_id: Optional[str] = None, + connector_name: Optional[str] = None, + destination_name: Optional[str] = None, fivetran_conn_id: str = "fivetran_default", poke_interval: int = 60, fivetran_retry_limit: int = 3, @@ -92,7 +99,9 @@ def __init__( **kwargs: Any, ) -> None: self.fivetran_conn_id = fivetran_conn_id - self.connector_id = connector_id + self._connector_id = connector_id + self.connector_name = connector_name + self.destination_name = destination_name self.poke_interval = poke_interval self.previous_completed_at: pendulum.DateTime | None = None self.fivetran_retry_limit = fivetran_retry_limit @@ -170,6 +179,17 @@ def hook(self) -> FivetranHook: retry_delay=self.fivetran_retry_delay, ) + @cached_property + def connector_id(self) -> str: + if self._connector_id: + return self._connector_id + elif self.connector_name and self.destination_name: + return self.hook.get_connector_id( + connector_name=self.connector_name, destination_name=self.destination_name + ) + + raise ValueError("No value specified for connector_id or to both connector_name and destination_name") + @property def xcom(self) -> str: import warnings diff --git a/tests/hooks/test_fivetran.py b/tests/hooks/test_fivetran.py index b1398c2..26dade7 100644 --- a/tests/hooks/test_fivetran.py +++ b/tests/hooks/test_fivetran.py @@ -763,6 +763,22 @@ def test_get_connectors(self, m): assert results[0]["id"] == "iodize_impressive" assert results[-1]["id"] == "iodize_open" + @requests_mock.mock() + def test_get_connector_id(self, m): + m.get( + "https://api.fivetran.com/v1/groups/", + json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_2, + ) + m.get( + "https://api.fivetran.com/v1/groups/rarer_gradient/connectors/", + json=MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD_1, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_connector_id(connector_name="salesforce", destination_name="GoogleSheets") + assert result == "iodize_impressive" + @requests_mock.mock() def test_start_fivetran_sync(self, m): m.get( diff --git a/tests/operators/test_fivetran.py b/tests/operators/test_fivetran.py index 7b62e23..6b80751 100644 --- a/tests/operators/test_fivetran.py +++ b/tests/operators/test_fivetran.py @@ -54,6 +54,67 @@ }, } +MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "rarer_gradient", + "name": "GoogleSheets", + "created_at": "2022-12-12T17:14:33.790844Z", + }, + ] + }, +} + +MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "interchangeable_revenge", + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "null", + "sync_frequency": 360, + "status": { + "setup_state": "connected", + "sync_state": "scheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + }, + }, + { + "id": "wicked_impressive", + "group_id": "rarer_gradient", + "service": "netsuite", + "service_version": 1, + "schema": "netsuite", + "connected_by": "concerning_batch", + "created_at": "2018-07-21T22:55:21.724201Z", + "succeeded_at": "2018-12-26T17:58:18.245Z", + "failed_at": "2018-08-24T15:24:58.872491Z", + "sync_frequency": 60, + "status": { + "setup_state": "connected", + "sync_state": "paused", + "update_state": "delayed", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + }, + }, + ] + }, +} + @pytest.fixture def context(): @@ -195,3 +256,66 @@ def test_default_conn_name(self): schedule_type="manual", ) assert task.fivetran_conn_id == FivetranHook.default_conn_name + + @requests_mock.mock() + def test_fivetran_op_without_connector_id(self, m): + """Tests that execute_complete method returns expected result and that it prints expected log""" + m.get( + "https://api.fivetran.com/v1/groups/", + json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD, + ) + m.get( + "https://api.fivetran.com/v1/groups/rarer_gradient/connectors/", + json=MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD, + ) + + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, + ) + + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, + ) + + task = FivetranOperator( + task_id="fivetran_op_async", + fivetran_conn_id="conn_fivetran", + connector_name="google_sheets.fivetran_google_sheets_spotify", + destination_name="GoogleSheets", + reschedule_wait_time=60, + schedule_type="manual", + ) + with pytest.raises(TaskDeferred): + task.execute(context) + + expected_return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"] + + with mock.patch.object(task.log, "info") as mock_log_info: + assert ( + task.execute_complete( + context=context, + event={ + "status": "success", + "message": "Fivetran sync completed", + "return_value": MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"], + }, + ) + == expected_return_value + ) + + mock_log_info.assert_called_with("Fivetran sync completed") + + def test_fivetran_op_without_connector_id_error(self): + """Tests that execute_complete method raises exception in case of error""" + with pytest.raises(ValueError) as exc: + FivetranOperator( + task_id="fivetran_op_async", + fivetran_conn_id="conn_fivetran", + ) + + assert ( + str(exc.value) + == "No value specified for connector_id or to both connector_name and destination_name" + )