diff --git a/README.md b/README.md index ca4a565..af315f0 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Fivetran Async Provider for Apache Airflow This package provides an async operator, sensor and hook that integrates [Fivetran](https://fivetran.com) into Apache Airflow. -`FivetranSensorAsync` allows you to monitor a Fivetran sync job for completion before running downstream processes. -`FivetranOperatorAsync` submits a Fivetran sync job and polls for its status on the triggerer. +`FivetranSensor` allows you to monitor a Fivetran sync job for completion before running downstream processes. +`FivetranOperator` submits a Fivetran sync job and polls for its status on the triggerer. Since an async sensor or operator frees up worker slot while polling is happening on the triggerer, they consume less resources when compared to traditional "sync" sensors and operators. @@ -33,30 +33,30 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi ### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py) -`FivetranOperatorAsync` submits a Fivetran sync job and monitors it on trigger for completion. +`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion. It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). Import into your DAG via: ```python -from fivetran_provider_async.operators import FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator ``` ### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py) -`FivetranSensorAsync` monitors a Fivetran sync job for completion. -Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. +`FivetranSensor` monitors a Fivetran sync job for completion. +Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. -You can use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran connectors. +You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors. If used in this way, -`FivetranSensorAsync` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). +`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors). Import into your DAG via: ```python -from fivetran_provider_async.sensors import FivetranSensorAsync +from fivetran_provider_async.sensors import FivetranSensor ``` ## Examples diff --git a/fivetran_provider_async/example_dags/example_fivetran.py b/fivetran_provider_async/example_dags/example_fivetran.py index 6c1c8cc..c5dde05 100644 --- a/fivetran_provider_async/example_dags/example_fivetran.py +++ b/fivetran_provider_async/example_dags/example_fivetran.py @@ -22,6 +22,7 @@ task_id="fivetran-task", fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", + deferrable=False, ) fivetran_sync_wait = FivetranSensor( @@ -29,6 +30,7 @@ fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, + deferrable=False, ) fivetran_sync_start >> fivetran_sync_wait diff --git a/fivetran_provider_async/example_dags/example_fivetran_async.py b/fivetran_provider_async/example_dags/example_fivetran_async.py index 63a0c8a..4f12b75 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_async.py +++ b/fivetran_provider_async/example_dags/example_fivetran_async.py @@ -2,8 +2,8 @@ from airflow import DAG -from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync -from fivetran_provider_async.sensors import FivetranSensorAsync +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor default_args = { "owner": "Airflow", @@ -19,7 +19,7 @@ ) with dag: - fivetran_async_op = FivetranOperatorAsync( + fivetran_async_op = FivetranOperator( task_id="fivetran_async_op", connector_id="bronzing_largely", ) @@ -27,9 +27,10 @@ fivetran_sync_op = FivetranOperator( task_id="fivetran_sync_op", connector_id="bronzing_largely", + deferrable=False, ) - fivetran_async_sensor = FivetranSensorAsync( + fivetran_async_sensor = FivetranSensor( task_id="fivetran_async_sensor", connector_id="bronzing_largely", poke_interval=5, diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index 21c51d5..39bf0ea 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -1,9 +1,12 @@ -from typing import Any, Dict, Optional +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Dict, Optional from airflow.exceptions import AirflowException from airflow.models import BaseOperator, BaseOperatorLink -from airflow.utils.context import Context -from airflow.utils.decorators import apply_defaults + +if TYPE_CHECKING: + from airflow.utils.context import Context from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -24,75 +27,12 @@ def get_link(self, operator, dttm): class FivetranOperator(BaseOperator): """ - `FivetranOperator` starts a Fivetran sync job. - - `FivetranOperator` requires that you specify the `connector_id` of the sync job to - start. You can find `connector_id` in the Settings page of the connector you - configured in the `Fivetran dashboard `_. - Note that when a Fivetran sync job is controlled via an Operator, it is no longer - run on the schedule as managed by Fivetran. In other words, it is now scheduled only - from Airflow. This can be changed with the schedule_type parameter. - - :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure - the hook. - :type fivetran_conn_id: Optional[str] - :param fivetran_retry_limit: # of retries when encountering API errors - :type fivetran_retry_limit: Optional[int] - :param fivetran_retry_delay: Time to wait before retrying API request - :type fivetran_retry_delay: int - :param connector_id: ID of the Fivetran connector to sync, found on the - Connector settings page. - :type connector_id: str - :param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule. - Set to "auto" to keep connector on Fivetran schedule. - :type schedule_type: str - """ - - operator_extra_links = (RegistryLink(),) - - # Define which fields get jinjaified - template_fields = ["connector_id"] - - @apply_defaults - def __init__( - self, - connector_id: str, - run_name: Optional[str] = None, - timeout_seconds: Optional[int] = None, - fivetran_conn_id: str = "fivetran", - fivetran_retry_limit: int = 3, - fivetran_retry_delay: int = 1, - poll_frequency: int = 15, - schedule_type: str = "manual", - **kwargs, - ): - super().__init__(**kwargs) - self.fivetran_conn_id = fivetran_conn_id - self.fivetran_retry_limit = fivetran_retry_limit - self.fivetran_retry_delay = fivetran_retry_delay - self.connector_id = connector_id - self.poll_frequency = poll_frequency - self.schedule_type = schedule_type - - def _get_hook(self) -> FivetranHook: - return FivetranHook( - self.fivetran_conn_id, - retry_limit=self.fivetran_retry_limit, - retry_delay=self.fivetran_retry_delay, - ) - - def execute(self, context): - hook = self._get_hook() - hook.prep_connector(self.connector_id, self.schedule_type) - return hook.start_fivetran_sync(self.connector_id) - - -class FivetranOperatorAsync(FivetranOperator): - """ - `FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the - airflow trigger.`FivetranOperatorAsync` requires that you specify the `connector_id` of + `FivetranOperator` submits a Fivetran sync job , and polls for its status on the + airflow trigger.`FivetranOperator` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. + If you do not want to run `FivetranOperator` in async mode you can set `deferrable` to + False in operator. :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. :param fivetran_retry_limit: # of retries when encountering API errors @@ -104,8 +44,13 @@ class FivetranOperatorAsync(FivetranOperator): :param poll_frequency: Time in seconds that the job should wait in between each try. :param reschedule_wait_time: Optional, if connector is in reset state, number of seconds to wait before restarting the sync. + :param deferrable: Run operator in deferrable mode. Default is True. """ + operator_extra_links = (RegistryLink(),) + + template_fields = ["connector_id"] + def __init__( self, connector_id: str, @@ -117,6 +62,7 @@ def __init__( poll_frequency: int = 15, schedule_type: str = "manual", reschedule_wait_time: int = 0, + deferrable: bool = True, **kwargs, ): self.connector_id = connector_id @@ -128,38 +74,41 @@ def __init__( self.poll_frequency = poll_frequency self.schedule_type = schedule_type self.reschedule_wait_time = reschedule_wait_time - super().__init__( - connector_id=self.connector_id, - run_name=self.run_name, - timeout_seconds=self.timeout_seconds, - fivetran_conn_id=self.fivetran_conn_id, - fivetran_retry_limit=self.fivetran_retry_limit, - fivetran_retry_delay=self.fivetran_retry_delay, - poll_frequency=self.poll_frequency, - schedule_type=self.schedule_type, - **kwargs, - ) + self.deferrable = deferrable + super().__init__(**kwargs) - def execute(self, context: Dict[str, Any]) -> None: + def execute(self, context: Context) -> None | str: """Start the sync using synchronous hook""" hook = self._get_hook() hook.prep_connector(self.connector_id, self.schedule_type) - hook.start_fivetran_sync(self.connector_id) - - # Defer and poll the sync status on the Triggerer - self.defer( - timeout=self.execution_timeout, - trigger=FivetranTrigger( - task_id=self.task_id, - fivetran_conn_id=self.fivetran_conn_id, - connector_id=self.connector_id, - poke_interval=self.poll_frequency, - reschedule_wait_time=self.reschedule_wait_time, - ), - method_name="execute_complete", + last_sync = hook.start_fivetran_sync(self.connector_id) + + if not self.deferrable: + return last_sync + else: + previous_completed_at = hook.get_last_sync(self.connector_id) + completed = hook.get_sync_status(self.connector_id, previous_completed_at) + if not completed: + self.defer( + timeout=self.execution_timeout, + trigger=FivetranTrigger( + task_id=self.task_id, + fivetran_conn_id=self.fivetran_conn_id, + connector_id=self.connector_id, + poke_interval=self.poll_frequency, + reschedule_wait_time=self.reschedule_wait_time, + ), + method_name="execute_complete", + ) + + def _get_hook(self) -> FivetranHook: + return FivetranHook( + self.fivetran_conn_id, + retry_limit=self.fivetran_retry_limit, + retry_delay=self.fivetran_retry_delay, ) - def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: + def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was @@ -242,3 +191,18 @@ def get_openlineage_facets_on_start(self): def get_openlineage_facets_on_complete(self, task_instance): return self.get_openlineage_facets_on_start() + + +class FivetranOperatorAsync(FivetranOperator): + """This operator has been deprecated. Please use `FivetranOperator`.""" + + def __init__(self, *args, **kwargs): + import warnings + + super().__init__(*args, **kwargs) + + warnings.warn( + "FivetranOperatorAsync has been deprecated. Please use `FivetranOperator`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index fe11876..c9da767 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -1,9 +1,12 @@ -from typing import Any, Dict, Optional +from __future__ import annotations + +from typing import TYPE_CHECKING, Any from airflow.exceptions import AirflowException from airflow.sensors.base import BaseSensorOperator -from airflow.utils.context import Context -from airflow.utils.decorators import apply_defaults + +if TYPE_CHECKING: + from airflow.utils.context import Context from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger @@ -11,45 +14,32 @@ class FivetranSensor(BaseSensorOperator): """ - `FivetranSensor` monitors a Fivetran sync job for completion. - + `FivetranSensor` asynchronously monitors a Fivetran sync job for completion. Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency. You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran - connectors. Note, it is possible to monitor a sync that is scheduled and managed - from Fivetran; in other words, you can use `FivetranSensor` without using - `FivetranOperator`. If used in this way, your DAG will wait until the sync job - starts on its Fivetran-controlled schedule and then completes. `FivetranSensor` - requires that you specify the `connector_id` of the sync job to start. You can - find `connector_id` in the Settings page of the connector you configured in the + connectors. `FivetranSensor` requires that you specify the `connector_id` of the sync + job to start. You can find `connector_id` in the Settings page of the connector you configured in the `Fivetran dashboard `_. + If you do not want to run `FivetranSensor` in async mode you can set `deferrable` to + False in sensor. :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook. - :type fivetran_conn_id: str :param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page in the Fivetran Dashboard. - :type connector_id: str :param poke_interval: Time in seconds that the job should wait in between each tries - :type poke_interval: int :param fivetran_retry_limit: # of retries when encountering API errors - :type fivetran_retry_limit: Optional[int] :param fivetran_retry_delay: Time to wait before retrying API request - :type fivetran_retry_delay: int - :param xcom: If used, FivetranSensor receives timestamp of previously - completed sync from FivetranOperator via XCOM - :type xcom: str - :param reschedule_time: Optional, if connector is in reset state + :param reschedule_wait_time: Optional, if connector is in reset state number of seconds to wait before restarting, else Fivetran suggestion used - :type reschedule_time: int + :param deferrable: Run sensor in deferrable mode. default is True. """ - # Define which fields get jinjaified template_fields = ["connector_id", "xcom"] - @apply_defaults def __init__( self, connector_id: str, @@ -58,10 +48,11 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", + reschedule_wait_time: int = 0, reschedule_time: int = 0, + deferrable: bool = True, **kwargs: Any, ) -> None: - super().__init__(**kwargs) self.fivetran_conn_id = fivetran_conn_id self.connector_id = connector_id self.poke_interval = poke_interval @@ -70,7 +61,29 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + self.reschedule_wait_time = reschedule_wait_time self.reschedule_time = reschedule_time + self.deferrable = deferrable + super().__init__(**kwargs) + + def execute(self, context: Context) -> None: + """Check for the target_status and defers using the trigger""" + if not self.deferrable: + super().execute(context=context) + elif not self.poke(context): + self.defer( + timeout=self.execution_timeout, + trigger=FivetranTrigger( + task_id=self.task_id, + fivetran_conn_id=self.fivetran_conn_id, + connector_id=self.connector_id, + previous_completed_at=self.previous_completed_at, + xcom=self.xcom, + poke_interval=self.poke_interval, + reschedule_wait_time=self.reschedule_wait_time, + ), + method_name="execute_complete", + ) def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -87,78 +100,7 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) - -class FivetranSensorAsync(FivetranSensor): - """ - `FivetranSensorAsync` asynchronously monitors a Fivetran sync job for completion. - Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only - when the Fivetran sync jobs have completed, ensuring data consistency. You can - use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran - connectors. `FivetranSensorAsync` requires that you specify the `connector_id` of the sync - job to start. You can find `connector_id` in the Settings page of the connector you configured in the - `Fivetran dashboard `_. - - - :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure - the hook. - :param connector_id: ID of the Fivetran connector to sync, found on the - Connector settings page in the Fivetran Dashboard. - :param poke_interval: Time in seconds that the job should wait in - between each tries - :param fivetran_retry_limit: # of retries when encountering API errors - :param fivetran_retry_delay: Time to wait before retrying API request - :param reschedule_wait_time: Optional, if connector is in reset state - number of seconds to wait before restarting, else Fivetran suggestion used - """ - - @apply_defaults - def __init__( - self, - connector_id: str, - fivetran_conn_id: str = "fivetran", - poke_interval: int = 60, - fivetran_retry_limit: int = 3, - fivetran_retry_delay: int = 1, - xcom: str = "", - reschedule_wait_time: int = 0, - **kwargs: Any, - ) -> None: - self.fivetran_conn_id = fivetran_conn_id - self.connector_id = connector_id - self.poke_interval = poke_interval - self.previous_completed_at = None - self.fivetran_retry_limit = fivetran_retry_limit - self.fivetran_retry_delay = fivetran_retry_delay - self.hook = None - self.xcom = xcom - self.reschedule_wait_time = reschedule_wait_time - super().__init__( - connector_id=self.connector_id, - fivetran_conn_id=self.fivetran_conn_id, - poke_interval=self.poke_interval, - fivetran_retry_limit=self.fivetran_retry_limit, - fivetran_retry_delay=self.fivetran_retry_delay, - xcom=self.xcom, - **kwargs, - ) - - def execute(self, context: Dict[str, Any]) -> None: - """Check for the target_status and defers using the trigger""" - self.defer( - timeout=self.execution_timeout, - trigger=FivetranTrigger( - task_id=self.task_id, - fivetran_conn_id=self.fivetran_conn_id, - connector_id=self.connector_id, - previous_completed_at=self.previous_completed_at, - xcom=self.xcom, - poke_interval=self.poke_interval, - reschedule_wait_time=self.reschedule_wait_time, - ), - method_name="execute_complete", - ) - - def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None: + def execute_complete(self, context: Context, event: dict[Any, Any] | None = None) -> None: """ Callback for when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was @@ -172,3 +114,20 @@ def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = self.log.info( event["message"], ) + + +class FivetranSensorAsync(FivetranSensor): + """This sensor has been deprecated. Please use `FivetranSensor`.""" + + template_fields = ["connector_id", "xcom"] + + def __init__(self, *args, **kwargs: Any) -> None: + import warnings + + super().__init__(*args, **kwargs) + + warnings.warn( + "FivetranSensorAsync has been deprecated. Please use `FivetranSensor`.", + DeprecationWarning, + stacklevel=2, + ) diff --git a/tests/operators/test_fivetran.py b/tests/operators/test_fivetran.py index ddba1f7..46dedb3 100644 --- a/tests/operators/test_fivetran.py +++ b/tests/operators/test_fivetran.py @@ -6,7 +6,7 @@ import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator from tests.common.static import ( MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD_SHEETS, MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS, @@ -64,7 +64,7 @@ def context(): @mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranOperatorAsync(unittest.TestCase): +class TestFivetranOperator(unittest.TestCase): @requests_mock.mock() def test_fivetran_op_async_execute_success(self, m): """Tests that task gets deferred after job submission""" @@ -78,7 +78,7 @@ def test_fivetran_op_async_execute_success(self, m): json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, ) - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -99,7 +99,7 @@ def test_fivetran_op_async_execute_success_reschedule_wait_time_and_manual_mode( json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, ) - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -111,7 +111,7 @@ def test_fivetran_op_async_execute_success_reschedule_wait_time_and_manual_mode( def test_fivetran_op_async_execute_complete_error(self): """Tests that execute_complete method raises exception in case of error""" - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -123,7 +123,7 @@ def test_fivetran_op_async_execute_complete_error(self): def test_fivetran_op_async_execute_complete_success(self): """Tests that execute_complete method returns expected result and that it prints expected log""" - task = FivetranOperatorAsync( + task = FivetranOperator( task_id="fivetran_op_async", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -172,7 +172,7 @@ def test_fivetran_operator_get_openlineage_facets_on_start(self, m): json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS, ) - operator = FivetranOperatorAsync( + operator = FivetranOperator( task_id="fivetran-task", fivetran_conn_id="conn_fivetran", connector_id="interchangeable_revenge", @@ -185,39 +185,3 @@ def test_fivetran_operator_get_openlineage_facets_on_start(self, m): assert schema_field.name == "column_1_dest" assert schema_field.type == "VARCHAR(256)" assert schema_field.description is None - - -# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranOperator(unittest.TestCase): - """ - Test functions for Fivetran Operator. - - Mocks responses from Fivetran API. - """ - - @requests_mock.mock() - def test_fivetran_operator(self, m): - m.get( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - m.patch( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - m.post( - "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", - json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, - ) - - operator = FivetranOperator( - task_id="fivetran-task", - fivetran_conn_id="conn_fivetran", - connector_id="interchangeable_revenge", - ) - - result = operator.execute({}) - log.info(result) - - assert result is not None diff --git a/tests/sensors/test_fivetran.py b/tests/sensors/test_fivetran.py index 74079e8..01f6387 100644 --- a/tests/sensors/test_fivetran.py +++ b/tests/sensors/test_fivetran.py @@ -1,12 +1,10 @@ import logging -import unittest from unittest import mock import pytest -import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.sensors import FivetranSensor, FivetranSensorAsync +from fivetran_provider_async.sensors import FivetranSensor from fivetran_provider_async.triggers import FivetranTrigger TASK_ID = "fivetran_sensor_check" @@ -57,11 +55,13 @@ def context(): yield context -class TestFivetranSensorAsync: - def test_fivetran_sensor_async(self): +class TestFivetranSensor: + @mock.patch("fivetran_provider_async.sensors.FivetranSensor.poke") + def test_fivetran_sensor_async(self, mock_poke): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed.""" - task = FivetranSensorAsync( + mock_poke.return_value = False + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -71,10 +71,12 @@ def test_fivetran_sensor_async(self): task.execute(context) assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" - def test_fivetran_sensor_async_with_response_wait_time(self): + @mock.patch("fivetran_provider_async.sensors.FivetranSensor.poke") + def test_fivetran_sensor_async_with_response_wait_time(self, mock_poke): """Asserts that a task is deferred and a FivetranTrigger will be fired when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" - task = FivetranSensorAsync( + mock_poke.return_value = False + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -87,7 +89,7 @@ def test_fivetran_sensor_async_with_response_wait_time(self): def test_fivetran_sensor_async_execute_failure(self, context): """Tests that an AirflowException is raised in case of error event""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -101,7 +103,7 @@ def test_fivetran_sensor_async_execute_failure(self, context): def test_fivetran_sensor_async_execute_complete(self): """Asserts that logging occurs as expected""" - task = FivetranSensorAsync( + task = FivetranSensor( task_id=TASK_ID, fivetran_conn_id="fivetran_default", connector_id="test_connector", @@ -112,25 +114,3 @@ def test_fivetran_sensor_async_execute_complete(self): context=None, event={"status": "success", "message": "Fivetran connector finished syncing"} ) mock_log_info.assert_called_with("Fivetran connector finished syncing") - - -# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) -@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranSensor(unittest.TestCase): - """ - Test functions for Fivetran Operator. - - Mocks responses from Fivetran API. - """ - - @mock.patch.object(FivetranSensor, "poke", "returned_sync_status") - @requests_mock.mock() - def test_del(self, m): - sensor = FivetranSensor( - task_id="my_fivetran_sensor", - fivetran_conn_id="conn_fivetran", - connector_id="interchangeable_revenge", - ) - - log.info(sensor.poke) - assert sensor.poke == "returned_sync_status"