Skip to content

Commit

Permalink
Add methods to get the connector_id (#101)
Browse files Browse the repository at this point in the history
* Add methods to get the connector_id

Add methods to get the connector_id using the Name and Destination from Connectors Dashboard

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add methods to get the connector_id

fix CI

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
natanlaverde and pre-commit-ci[bot] authored Jul 22, 2024
1 parent 6bd6d5c commit 55b2b69
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime, timedelta

from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator

default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
}

dag = DAG(
dag_id="example_fivetran_without_connector_id",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
fivetran_sync_start = FivetranOperator(
task_id="fivetran_task",
fivetran_conn_id="fivetran_default",
connector_name="{{ var.value.connector_name }}",
destination_name="{{ var.value.destination_name }}",
)
15 changes: 15 additions & 0 deletions fivetran_provider_async/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ def get_connectors(self, group_id: str) -> Iterator[dict]:
for connector in resp["data"]["items"]:
yield connector

def get_connector_id(self, connector_name: str, destination_name: str) -> str:
all_groups = self._get_groups()
group = next((group for group in all_groups if group.get("name") == destination_name), None)
if not group:
raise ValueError(f"Destination '{destination_name}' not found.")

all_connectors = self.get_connectors(group_id=group.get("id", ""))
connector = next(
(connector for connector in all_connectors if connector.get("schema") == connector_name), None
)
if not connector:
raise ValueError(f"Connector '{connector_name}' not found in Destination '{destination_name}'.")

return connector.get("id", "")

def check_connector(self, connector_id: str) -> dict[str, Any]:
"""
Ensures connector configuration has been completed successfully and is in
Expand Down
27 changes: 23 additions & 4 deletions fivetran_provider_async/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ class FivetranOperator(BaseOperator):
:param fivetran_retry_delay: Time to wait before retrying API request
:param run_name: Fivetran run name
:param timeout_seconds: Timeout in seconds
:param connector_id: ID of the Fivetran connector to sync, found on the Connector settings page.
:param connector_id: Optional, ID of the Fivetran connector to sync, found on the Connector settings page.
:param connector_name: Optional, Name of the Fivetran connector to sync, found on the
Connectors page in the Fivetran Dashboard.
:param destination_name: Optional, Destination of the Fivetran connector to sync, found on the
Connectors page in the Fivetran Dashboard.
:param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule.
:param poll_frequency: Time in seconds that the job should wait in between each try.
:param reschedule_wait_time: Optional, if connector is in reset state,
Expand All @@ -56,11 +60,13 @@ class FivetranOperator(BaseOperator):

operator_extra_links = (RegistryLink(),)

template_fields = ["connector_id"]
template_fields = ["connector_id", "connector_name", "destination_name"]

def __init__(
self,
connector_id: str,
connector_id: Optional[str] = None,
connector_name: Optional[str] = None,
destination_name: Optional[str] = None,
run_name: Optional[str] = None,
fivetran_conn_id: str = "fivetran_default",
fivetran_retry_limit: int = 3,
Expand All @@ -72,7 +78,9 @@ def __init__(
wait_for_completion: bool = True,
**kwargs,
) -> None:
self.connector_id = connector_id
self._connector_id = connector_id
self.connector_name = connector_name
self.destination_name = destination_name
self.fivetran_conn_id = fivetran_conn_id
self.run_name = run_name

Expand Down Expand Up @@ -158,6 +166,17 @@ def hook(self) -> FivetranHook:
retry_delay=self.fivetran_retry_delay,
)

@cached_property
def connector_id(self) -> str:
if self._connector_id:
return self._connector_id
elif self.connector_name and self.destination_name:
return self.hook.get_connector_id(
connector_name=self.connector_name, destination_name=self.destination_name
)

raise ValueError("No value specified for connector_id or to both connector_name and destination_name")

def execute_complete(self, context: Context, event: Optional[Dict[Any, Any]] = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
Expand Down
32 changes: 26 additions & 6 deletions fivetran_provider_async/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from datetime import datetime
from functools import cached_property
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

from airflow.exceptions import AirflowException
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -37,8 +37,9 @@ class FivetranSensor(BaseSensorOperator):
`completed_after_time` is `2020-01-01 03:00:00`, then the sensor will
stop waiting at `03:05:00`.
`FivetranSensor` requires that you specify the `connector_id` of the sync
job to start. You can find `connector_id` in the Settings page of the
`FivetranSensor` requires that you specify either the `connector_id` or
both `connector_name` and `destination_name` of the sync job to start.
You can find `connector_id` in the Settings page of the
connector you configured in the
`Fivetran dashboard <https://fivetran.com/dashboard/connectors>`_.
Expand All @@ -47,8 +48,12 @@ class FivetranSensor(BaseSensorOperator):
:param fivetran_conn_id: `Conn ID` of the Connection to be used to configure
the hook.
:param connector_id: ID of the Fivetran connector to sync, found on the
:param connector_id: Optional. ID of the Fivetran connector to sync, found on the
Connector settings page in the Fivetran Dashboard.
:param connector_name: Optional. Name of the Fivetran connector to sync, found on the
Connectors page in the Fivetran Dashboard.
:param destination_name: Optional. Destination of the Fivetran connector to sync, found on the
Connectors page in the Fivetran Dashboard.
:param poke_interval: Time in seconds that the job should wait in
between each try
:param fivetran_retry_limit: # of retries when encountering API errors
Expand Down Expand Up @@ -79,7 +84,9 @@ class FivetranSensor(BaseSensorOperator):

def __init__(
self,
connector_id: str,
connector_id: Optional[str] = None,
connector_name: Optional[str] = None,
destination_name: Optional[str] = None,
fivetran_conn_id: str = "fivetran_default",
poke_interval: int = 60,
fivetran_retry_limit: int = 3,
Expand All @@ -92,7 +99,9 @@ def __init__(
**kwargs: Any,
) -> None:
self.fivetran_conn_id = fivetran_conn_id
self.connector_id = connector_id
self._connector_id = connector_id
self.connector_name = connector_name
self.destination_name = destination_name
self.poke_interval = poke_interval
self.previous_completed_at: pendulum.DateTime | None = None
self.fivetran_retry_limit = fivetran_retry_limit
Expand Down Expand Up @@ -170,6 +179,17 @@ def hook(self) -> FivetranHook:
retry_delay=self.fivetran_retry_delay,
)

@cached_property
def connector_id(self) -> str:
if self._connector_id:
return self._connector_id
elif self.connector_name and self.destination_name:
return self.hook.get_connector_id(
connector_name=self.connector_name, destination_name=self.destination_name
)

raise ValueError("No value specified for connector_id or to both connector_name and destination_name")

@property
def xcom(self) -> str:
import warnings
Expand Down
16 changes: 16 additions & 0 deletions tests/hooks/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,22 @@ def test_get_connectors(self, m):
assert results[0]["id"] == "iodize_impressive"
assert results[-1]["id"] == "iodize_open"

@requests_mock.mock()
def test_get_connector_id(self, m):
m.get(
"https://api.fivetran.com/v1/groups/",
json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_2,
)
m.get(
"https://api.fivetran.com/v1/groups/rarer_gradient/connectors/",
json=MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD_1,
)
hook = FivetranHook(
fivetran_conn_id="conn_fivetran",
)
result = hook.get_connector_id(connector_name="salesforce", destination_name="GoogleSheets")
assert result == "iodize_impressive"

@requests_mock.mock()
def test_start_fivetran_sync(self, m):
m.get(
Expand Down
124 changes: 124 additions & 0 deletions tests/operators/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,67 @@
},
}

MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"items": [
{
"id": "rarer_gradient",
"name": "GoogleSheets",
"created_at": "2022-12-12T17:14:33.790844Z",
},
]
},
}

MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD = {
"code": "Success",
"data": {
"items": [
{
"id": "interchangeable_revenge",
"group_id": "rarer_gradient",
"service": "google_sheets",
"service_version": 1,
"schema": "google_sheets.fivetran_google_sheets_spotify",
"connected_by": "mournful_shalt",
"created_at": "2021-03-05T22:58:56.238875Z",
"succeeded_at": "2021-03-23T20:55:12.670390Z",
"failed_at": "null",
"sync_frequency": 360,
"status": {
"setup_state": "connected",
"sync_state": "scheduled",
"update_state": "on_schedule",
"is_historical_sync": False,
"tasks": [],
"warnings": [],
},
},
{
"id": "wicked_impressive",
"group_id": "rarer_gradient",
"service": "netsuite",
"service_version": 1,
"schema": "netsuite",
"connected_by": "concerning_batch",
"created_at": "2018-07-21T22:55:21.724201Z",
"succeeded_at": "2018-12-26T17:58:18.245Z",
"failed_at": "2018-08-24T15:24:58.872491Z",
"sync_frequency": 60,
"status": {
"setup_state": "connected",
"sync_state": "paused",
"update_state": "delayed",
"is_historical_sync": False,
"tasks": [],
"warnings": [],
},
},
]
},
}


@pytest.fixture
def context():
Expand Down Expand Up @@ -195,3 +256,66 @@ def test_default_conn_name(self):
schedule_type="manual",
)
assert task.fivetran_conn_id == FivetranHook.default_conn_name

@requests_mock.mock()
def test_fivetran_op_without_connector_id(self, m):
"""Tests that execute_complete method returns expected result and that it prints expected log"""
m.get(
"https://api.fivetran.com/v1/groups/",
json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD,
)
m.get(
"https://api.fivetran.com/v1/groups/rarer_gradient/connectors/",
json=MOCK_FIVETRAN_CONNECTORS_RESPONSE_PAYLOAD,
)

m.get(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS,
)

m.post(
"https://api.fivetran.com/v1/connectors/interchangeable_revenge/force",
json=MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS,
)

task = FivetranOperator(
task_id="fivetran_op_async",
fivetran_conn_id="conn_fivetran",
connector_name="google_sheets.fivetran_google_sheets_spotify",
destination_name="GoogleSheets",
reschedule_wait_time=60,
schedule_type="manual",
)
with pytest.raises(TaskDeferred):
task.execute(context)

expected_return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"]

with mock.patch.object(task.log, "info") as mock_log_info:
assert (
task.execute_complete(
context=context,
event={
"status": "success",
"message": "Fivetran sync completed",
"return_value": MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"],
},
)
== expected_return_value
)

mock_log_info.assert_called_with("Fivetran sync completed")

def test_fivetran_op_without_connector_id_error(self):
"""Tests that execute_complete method raises exception in case of error"""
with pytest.raises(ValueError) as exc:
FivetranOperator(
task_id="fivetran_op_async",
fivetran_conn_id="conn_fivetran",
)

assert (
str(exc.value)
== "No value specified for connector_id or to both connector_name and destination_name"
)

0 comments on commit 55b2b69

Please sign in to comment.