Skip to content

Commit

Permalink
Merge sync and async operator (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro authored Aug 7, 2023
1 parent 3f3aecf commit 362337a
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 280 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Fivetran Async Provider for Apache Airflow

This package provides an async operator, sensor and hook that integrates [Fivetran](https://fivetran.com) into Apache Airflow.
`FivetranSensorAsync` allows you to monitor a Fivetran sync job for completion before running downstream processes.
`FivetranOperatorAsync` submits a Fivetran sync job and polls for its status on the triggerer.
`FivetranSensor` allows you to monitor a Fivetran sync job for completion before running downstream processes.
`FivetranOperator` submits a Fivetran sync job and polls for its status on the triggerer.
Since an async sensor or operator frees up worker slot while polling is happening on the triggerer,
they consume less resources when compared to traditional "sync" sensors and operators.

Expand Down Expand Up @@ -33,30 +33,30 @@ The sensor assumes the `Conn Id` is set to `fivetran`, however if you are managi

### [Fivetran Operator Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/operators.py)

`FivetranOperatorAsync` submits a Fivetran sync job and monitors it on trigger for completion.
`FivetranOperator` submits a Fivetran sync job and monitors it on trigger for completion.
It requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.operators import FivetranOperatorAsync
from fivetran_provider_async.operators import FivetranOperator
```

### [Fivetran Sensor Async](https://github.com/astronomer/airflow-provider-fivetran-async/tree/main/fivetran_provider_async/sensors.py)

`FivetranSensorAsync` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensorAsync` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.
`FivetranSensor` monitors a Fivetran sync job for completion.
Monitoring with `FivetranSensor` allows you to trigger downstream processes only when the Fivetran sync jobs have completed, ensuring data consistency.



You can use multiple instances of `FivetranSensorAsync` to monitor multiple Fivetran connectors.
You can use multiple instances of `FivetranSensor` to monitor multiple Fivetran connectors.

If used in this way,

`FivetranSensorAsync` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).
`FivetranSensor` requires that you specify the `connector_id` of the sync job to start. You can find `connector_id` in the Settings page of the connector you configured in the [Fivetran dashboard](https://fivetran.com/dashboard/connectors).

Import into your DAG via:
```python
from fivetran_provider_async.sensors import FivetranSensorAsync
from fivetran_provider_async.sensors import FivetranSensor
```

## Examples
Expand Down
2 changes: 2 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
deferrable=False,
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
deferrable=False,
)

fivetran_sync_start >> fivetran_sync_wait
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync
from fivetran_provider_async.sensors import FivetranSensorAsync
from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
Expand All @@ -19,17 +19,18 @@
)

with dag:
fivetran_async_op = FivetranOperatorAsync(
fivetran_async_op = FivetranOperator(
task_id="fivetran_async_op",
connector_id="bronzing_largely",
)

fivetran_sync_op = FivetranOperator(
task_id="fivetran_sync_op",
connector_id="bronzing_largely",
deferrable=False,
)

fivetran_async_sensor = FivetranSensorAsync(
fivetran_async_sensor = FivetranSensor(
task_id="fivetran_async_sensor",
connector_id="bronzing_largely",
poke_interval=5,
Expand Down
156 changes: 60 additions & 96 deletions fivetran_provider_async/operators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Any, Dict, Optional
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.utils.context import Context
from airflow.utils.decorators import apply_defaults

if TYPE_CHECKING:
from airflow.utils.context import Context

from fivetran_provider_async.hooks import FivetranHook
from fivetran_provider_async.triggers import FivetranTrigger
Expand All @@ -24,75 +27,12 @@ def get_link(self, operator, dttm):

class FivetranOperator(BaseOperator):
"""
`FivetranOperator` starts a Fivetran sync job.
`FivetranOperator` requires that you specify the `connector_id` of the sync job to
start. You can find `connector_id` in the Settings page of the connector you
configured in the `Fivetran dashboard <https://fivetran.com/dashboard/connectors>`_.
Note that when a Fivetran sync job is controlled via an Operator, it is no longer
run on the schedule as managed by Fivetran. In other words, it is now scheduled only
from Airflow. This can be changed with the schedule_type parameter.
:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure
the hook.
:type fivetran_conn_id: Optional[str]
:param fivetran_retry_limit: # of retries when encountering API errors
:type fivetran_retry_limit: Optional[int]
:param fivetran_retry_delay: Time to wait before retrying API request
:type fivetran_retry_delay: int
:param connector_id: ID of the Fivetran connector to sync, found on the
Connector settings page.
:type connector_id: str
:param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule.
Set to "auto" to keep connector on Fivetran schedule.
:type schedule_type: str
"""

operator_extra_links = (RegistryLink(),)

# Define which fields get jinjaified
template_fields = ["connector_id"]

@apply_defaults
def __init__(
self,
connector_id: str,
run_name: Optional[str] = None,
timeout_seconds: Optional[int] = None,
fivetran_conn_id: str = "fivetran",
fivetran_retry_limit: int = 3,
fivetran_retry_delay: int = 1,
poll_frequency: int = 15,
schedule_type: str = "manual",
**kwargs,
):
super().__init__(**kwargs)
self.fivetran_conn_id = fivetran_conn_id
self.fivetran_retry_limit = fivetran_retry_limit
self.fivetran_retry_delay = fivetran_retry_delay
self.connector_id = connector_id
self.poll_frequency = poll_frequency
self.schedule_type = schedule_type

def _get_hook(self) -> FivetranHook:
return FivetranHook(
self.fivetran_conn_id,
retry_limit=self.fivetran_retry_limit,
retry_delay=self.fivetran_retry_delay,
)

def execute(self, context):
hook = self._get_hook()
hook.prep_connector(self.connector_id, self.schedule_type)
return hook.start_fivetran_sync(self.connector_id)


class FivetranOperatorAsync(FivetranOperator):
"""
`FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the
airflow trigger.`FivetranOperatorAsync` requires that you specify the `connector_id` of
`FivetranOperator` submits a Fivetran sync job , and polls for its status on the
airflow trigger.`FivetranOperator` requires that you specify the `connector_id` of
the sync job to start. You can find `connector_id` in the Settings page of the connector
you configured in the `Fivetran dashboard <https://fivetran.com/dashboard/connectors>`_.
If you do not want to run `FivetranOperator` in async mode you can set `deferrable` to
False in operator.
:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure the hook.
:param fivetran_retry_limit: # of retries when encountering API errors
Expand All @@ -104,8 +44,13 @@ class FivetranOperatorAsync(FivetranOperator):
:param poll_frequency: Time in seconds that the job should wait in between each try.
:param reschedule_wait_time: Optional, if connector is in reset state,
number of seconds to wait before restarting the sync.
:param deferrable: Run operator in deferrable mode. Default is True.
"""

operator_extra_links = (RegistryLink(),)

template_fields = ["connector_id"]

def __init__(
self,
connector_id: str,
Expand All @@ -117,6 +62,7 @@ def __init__(
poll_frequency: int = 15,
schedule_type: str = "manual",
reschedule_wait_time: int = 0,
deferrable: bool = True,
**kwargs,
):
self.connector_id = connector_id
Expand All @@ -128,38 +74,41 @@ def __init__(
self.poll_frequency = poll_frequency
self.schedule_type = schedule_type
self.reschedule_wait_time = reschedule_wait_time
super().__init__(
connector_id=self.connector_id,
run_name=self.run_name,
timeout_seconds=self.timeout_seconds,
fivetran_conn_id=self.fivetran_conn_id,
fivetran_retry_limit=self.fivetran_retry_limit,
fivetran_retry_delay=self.fivetran_retry_delay,
poll_frequency=self.poll_frequency,
schedule_type=self.schedule_type,
**kwargs,
)
self.deferrable = deferrable
super().__init__(**kwargs)

def execute(self, context: Dict[str, Any]) -> None:
def execute(self, context: Context) -> None | str:
"""Start the sync using synchronous hook"""
hook = self._get_hook()
hook.prep_connector(self.connector_id, self.schedule_type)
hook.start_fivetran_sync(self.connector_id)

# Defer and poll the sync status on the Triggerer
self.defer(
timeout=self.execution_timeout,
trigger=FivetranTrigger(
task_id=self.task_id,
fivetran_conn_id=self.fivetran_conn_id,
connector_id=self.connector_id,
poke_interval=self.poll_frequency,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
last_sync = hook.start_fivetran_sync(self.connector_id)

if not self.deferrable:
return last_sync
else:
previous_completed_at = hook.get_last_sync(self.connector_id)
completed = hook.get_sync_status(self.connector_id, previous_completed_at)
if not completed:
self.defer(
timeout=self.execution_timeout,
trigger=FivetranTrigger(
task_id=self.task_id,
fivetran_conn_id=self.fivetran_conn_id,
connector_id=self.connector_id,
poke_interval=self.poll_frequency,
reschedule_wait_time=self.reschedule_wait_time,
),
method_name="execute_complete",
)

def _get_hook(self) -> FivetranHook:
return FivetranHook(
self.fivetran_conn_id,
retry_limit=self.fivetran_retry_limit,
retry_delay=self.fivetran_retry_delay,
)

def execute_complete(self, context: "Context", event: Optional[Dict[Any, Any]] = None) -> None:
def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
Expand Down Expand Up @@ -242,3 +191,18 @@ def get_openlineage_facets_on_start(self):

def get_openlineage_facets_on_complete(self, task_instance):
return self.get_openlineage_facets_on_start()


class FivetranOperatorAsync(FivetranOperator):
"""This operator has been deprecated. Please use `FivetranOperator`."""

def __init__(self, *args, **kwargs):
import warnings

super().__init__(*args, **kwargs)

warnings.warn(
"FivetranOperatorAsync has been deprecated. Please use `FivetranOperator`.",
DeprecationWarning,
stacklevel=2,
)
Loading

0 comments on commit 362337a

Please sign in to comment.