From 3294955b843f2173dbd87b104ee21ecc8bec32ac Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Tue, 11 Jul 2023 19:27:50 +0530 Subject: [PATCH] Move fivetran sync provider code to this repo (#33) * Move fivetran sync provider code to this repo Move code from https://github.com/fivetran/airflow-provider-fivetran to this repo --- .../example_dags/example_fivetran.py | 34 + .../example_dags/example_fivetran_async.py | 3 +- .../example_dags/example_fivetran_bigquery.py | 88 ++ .../example_dags/example_fivetran_bqml.py | 127 +++ .../example_dags/example_fivetran_dbt.py | 50 + .../example_dags/example_fivetran_xcom.py | 40 + fivetran_provider_async/hooks.py | 472 +++++++++- fivetran_provider_async/operators.py | 81 +- fivetran_provider_async/sensors.py | 82 +- setup.cfg | 2 +- tests/hooks/test_fivetran.py | 860 +++++++++++------- tests/operators/test_fivetran.py | 78 +- tests/sensors/test_fivetran.py | 170 ++-- 13 files changed, 1705 insertions(+), 382 deletions(-) create mode 100644 fivetran_provider_async/example_dags/example_fivetran.py create mode 100644 fivetran_provider_async/example_dags/example_fivetran_bigquery.py create mode 100644 fivetran_provider_async/example_dags/example_fivetran_bqml.py create mode 100644 fivetran_provider_async/example_dags/example_fivetran_dbt.py create mode 100644 fivetran_provider_async/example_dags/example_fivetran_xcom.py diff --git a/fivetran_provider_async/example_dags/example_fivetran.py b/fivetran_provider_async/example_dags/example_fivetran.py new file mode 100644 index 0000000..6c1c8cc --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran.py @@ -0,0 +1,34 @@ +from datetime import datetime, timedelta + +from airflow import DAG + +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), +} + +dag = DAG( + dag_id="example_fivetran", + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) + +with dag: + fivetran_sync_start = FivetranOperator( + task_id="fivetran-task", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + ) + + fivetran_sync_wait = FivetranSensor( + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + poke_interval=5, + ) + + fivetran_sync_start >> fivetran_sync_wait diff --git a/fivetran_provider_async/example_dags/example_fivetran_async.py b/fivetran_provider_async/example_dags/example_fivetran_async.py index c9775fe..63a0c8a 100644 --- a/fivetran_provider_async/example_dags/example_fivetran_async.py +++ b/fivetran_provider_async/example_dags/example_fivetran_async.py @@ -1,9 +1,8 @@ from datetime import datetime, timedelta from airflow import DAG -from fivetran_provider.operators.fivetran import FivetranOperator -from fivetran_provider_async.operators import FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync from fivetran_provider_async.sensors import FivetranSensorAsync default_args = { diff --git a/fivetran_provider_async/example_dags/example_fivetran_bigquery.py b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py new file mode 100644 index 0000000..3c7e601 --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran_bigquery.py @@ -0,0 +1,88 @@ +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from airflow.providers.google.cloud.operators.bigquery import BigQueryValueCheckOperator +from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor +from airflow.utils.dates import datetime +from fivetran_provider.operators.fivetran import FivetranOperator + +from fivetran_provider_async.sensors import FivetranSensor + +TABLE = "forestfires" +DATASET = "google_sheets" +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +default_args = { + "owner": "astronomer", + "depends_on_past": False, + "start_date": datetime(2021, 7, 7), + "email": ["noreply@astronomer.io"], + "email_on_failure": False, +} + +with DAG( + "example_fivetran_bigquery", + default_args=default_args, + description="", + schedule_interval=None, + catchup=False, +) as dag: + """ + ### Simple EL Pipeline with Data Integrity and Quality Checks + Before running the DAG, set the following in an Airflow or Environment Variables: + - key: gcp_project_id + value: [gcp_project_id] + - key: connector_id + value: [connector_id] + Fully replacing [gcp_project_id] & [connector_id] with the actual IDs. + What makes this a simple data quality case is: + 1. Absolute ground truth: the local CSV file is considered perfect and immutable. + 2. No transformations or business logic. + 3. Exact values of data to quality check are known. + """ + + """ + #### FivetranOperator & FivetranSensor + Calling Fivetran to begin data movement from Google Sheets to BigQuery + The FivetranSensor monitors the status of the Fivetran data sync + """ + fivetran_sync_start = FivetranOperator( + task_id="fivetran-task", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + ) + + fivetran_sync_wait = FivetranSensor( + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + poke_interval=5, + ) + + """ + #### BigQuery row validation task + Ensure that data was copied to BigQuery correctly, i.e. the table and dataset + exist. + """ + validate_bigquery = BigQueryTableExistenceSensor( + task_id="validate_bigquery", + project_id="{{ var.value.gcp_project_id }}", + dataset_id=DATASET, + table_id="forestfires", + ) + + """ + #### Row-level data quality check + Run a data quality check on a few rows, ensuring that the data in BigQuery + matches the ground truth in the correspoding JSON file. + """ + check_bq_row_count = BigQueryValueCheckOperator( + task_id="check_row_count", + sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE}", + pass_value=516, + use_legacy_sql=False, + ) + + done = EmptyOperator(task_id="done") + + fivetran_sync_start >> fivetran_sync_wait >> validate_bigquery + validate_bigquery >> check_bq_row_count >> done diff --git a/fivetran_provider_async/example_dags/example_fivetran_bqml.py b/fivetran_provider_async/example_dags/example_fivetran_bqml.py new file mode 100644 index 0000000..1ba602c --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran_bqml.py @@ -0,0 +1,127 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import BranchPythonOperator +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryExecuteQueryOperator, + BigQueryGetDataOperator, +) +from airflow.providers.ssh.operators.ssh import SSHOperator + +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor + +# EDIT WITH YOUR PROJECT ID & DATASET NAME +PROJECT_ID = "YOUR PROJECT ID" +DATASET_NAME = "bqml" +DESTINATION_TABLE = "dbt_ads_bqml_preds" + +TRAINING_QUERY = ( + "CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model " + "OPTIONS " + "(model_type = 'ARIMA_PLUS', " + "time_series_timestamp_col = 'parsed_date', " + "time_series_data_col = 'daily_impressions', " + "auto_arima = TRUE, " + "data_frequency = 'AUTO_FREQUENCY', " + "decompose_time_series = TRUE " + ") AS " + "SELECT " + "timestamp(date_day) as parsed_date, " + "SUM(impressions) as daily_impressions " + "FROM `" + PROJECT_ID + ".bqml.ad_reporting` " + "GROUP BY date_day;" +) + +SERVING_QUERY = ( + "SELECT string(forecast_timestamp) as forecast_timestamp, " + "forecast_value, " + "standard_error, " + "confidence_level, " + "prediction_interval_lower_bound, " + "prediction_interval_upper_bound, " + "confidence_interval_lower_bound, " + "confidence_interval_upper_bound " + "FROM ML.FORECAST(MODEL `" + + PROJECT_ID + + ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));" +) + + +def ml_branch(ds, **kwargs): + if "train" in kwargs["params"] and kwargs["params"]["train"]: + return "train_model" + else: + return "get_predictions" + + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), +} + +dag = DAG( + dag_id="example_fivetran_bqml", + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) + +with dag: + linkedin_sync = FivetranOperator( + task_id="linkedin-sync", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.linkedin_connector_id }}", + ) + + linkedin_sensor = FivetranSensor( + task_id="linkedin-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.linkedin_connector_id }}", + poke_interval=5, + ) + + twitter_sync = FivetranOperator( + task_id="twitter-sync", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.twitter_connector_id }}", + ) + + twitter_sensor = FivetranSensor( + task_id="twitter-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.twitter_connector_id }}", + poke_interval=5, + ) + + dbt_run = SSHOperator( + task_id="dbt_ad_reporting", + command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", + ssh_conn_id="dbtvm", + ) + + ml_branch = BranchPythonOperator(task_id="ml_branch", python_callable=ml_branch, provide_context=True) + + train_model = BigQueryExecuteQueryOperator( + task_id="train_model", sql=TRAINING_QUERY, use_legacy_sql=False + ) + + get_preds = BigQueryExecuteQueryOperator( + task_id="get_predictions", + sql=SERVING_QUERY, + use_legacy_sql=False, + destination_dataset_table=DATASET_NAME + "." + DESTINATION_TABLE, + write_disposition="WRITE_APPEND", + ) + + print_preds = BigQueryGetDataOperator( + task_id="print_predictions", dataset_id=DATASET_NAME, table_id=DESTINATION_TABLE + ) + + linkedin_sync >> linkedin_sensor + twitter_sync >> twitter_sensor + + [linkedin_sensor, twitter_sensor] >> dbt_run + + dbt_run >> ml_branch >> [train_model, get_preds] + get_preds >> print_preds diff --git a/fivetran_provider_async/example_dags/example_fivetran_dbt.py b/fivetran_provider_async/example_dags/example_fivetran_dbt.py new file mode 100644 index 0000000..4933b17 --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran_dbt.py @@ -0,0 +1,50 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.ssh.operators.ssh import SSHOperator + +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), +} + +with DAG( + dag_id="ad_reporting_dag", + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) as dag: + linkedin_sync = FivetranOperator( + task_id="linkedin-ads-sync", + connector_id="{{ var.value.linkedin_connector_id }}", + ) + + linkedin_sensor = FivetranSensor( + task_id="linkedin-sensor", + connector_id="{{ var.value.linkedin_connector_id }}", + poke_interval=600, + ) + + twitter_sync = FivetranOperator( + task_id="twitter-ads-sync", + connector_id="{{ var.value.twitter_connector_id }}", + ) + + twitter_sensor = FivetranSensor( + task_id="twitter-sensor", + connector_id="{{ var.value.twitter_connector_id }}", + poke_interval=600, + ) + + dbt_run = SSHOperator( + task_id="dbt_ad_reporting", + command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", + ssh_conn_id="dbtvm", + ) + + linkedin_sync >> linkedin_sensor + twitter_sync >> twitter_sensor + [linkedin_sensor, twitter_sensor] >> dbt_run diff --git a/fivetran_provider_async/example_dags/example_fivetran_xcom.py b/fivetran_provider_async/example_dags/example_fivetran_xcom.py new file mode 100644 index 0000000..4946e82 --- /dev/null +++ b/fivetran_provider_async/example_dags/example_fivetran_xcom.py @@ -0,0 +1,40 @@ +import time +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator + +from fivetran_provider_async.operators import FivetranOperator +from fivetran_provider_async.sensors import FivetranSensor + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), + "provide_context": True, +} + +dag = DAG( + dag_id="example_fivetran_xcom", + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) + +with dag: + fivetran_operator = FivetranOperator( + task_id="fivetran-operator", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + ) + + delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60)) + + fivetran_sensor = FivetranSensor( + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + poke_interval=5, + xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", + ) + + fivetran_operator >> delay_task >> fivetran_sensor diff --git a/fivetran_provider_async/hooks.py b/fivetran_provider_async/hooks.py index ff47ef3..1f4d7fd 100644 --- a/fivetran_provider_async/hooks.py +++ b/fivetran_provider_async/hooks.py @@ -1,13 +1,472 @@ import asyncio +import json import time +from time import sleep from typing import Any, Dict, cast import aiohttp import pendulum +import requests from aiohttp import ClientResponseError from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook from asgiref.sync import sync_to_async -from fivetran_provider.hooks.fivetran import FivetranHook +from requests import exceptions as requests_exceptions + + +class FivetranHook(BaseHook): + """ + Fivetran API interaction hook. + + :param fivetran_conn_id: `Conn ID` of the Connection to be used to + configure this hook. + :type fivetran_conn_id: str + :param timeout_seconds: The amount of time in seconds the requests library + will wait before timing out. + :type timeout_seconds: int + :param retry_limit: The number of times to retry the connection in case of + service outages. + :type retry_limit: int + :param retry_delay: The number of seconds to wait between retries. + :type retry_delay: float + """ + + conn_name_attr = "fivetran_conn_id" + default_conn_name = "fivetran_default" + conn_type = "fivetran" + hook_name = "Fivetran" + api_user_agent = "airflow_provider_fivetran/1.1.4" + api_protocol = "https" + api_host = "api.fivetran.com" + api_path_connectors = "v1/connectors/" + api_metadata_path_connectors = "v1/metadata/connectors/" + api_path_destinations = "v1/destinations/" + api_path_groups = "v1/groups/" + + @staticmethod + def get_ui_field_behaviour() -> Dict: + """Returns custom field behaviour""" + return { + "hidden_fields": ["schema", "port", "extra", "host"], + "relabeling": { + "login": "Fivetran API Key", + "password": "Fivetran API Secret", + }, + "placeholders": { + "login": "api key", + "password": "api secret", + }, + } + + @staticmethod + def _get_airflow_version() -> str: + """ + Fetch and return the current Airflow version + from aws provider + https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/hooks/base_aws.py#L486 + """ + try: + # This can be a circular import under specific configurations. + # Importing locally to either avoid or catch it if it does happen. + from airflow import __version__ as airflow_version + + return "-airflow_version/" + airflow_version + except Exception: + # Under no condition should an error here ever cause an issue for the user. + return "" + + def __init__( + self, + fivetran_conn_id: str = "fivetran", + fivetran_conn=None, + timeout_seconds: int = 180, + retry_limit: int = 3, + retry_delay: float = 1.0, + ) -> None: + super().__init__(None) # Passing None fixes a runtime problem in Airflow 1 + self.conn_id = fivetran_conn_id + self.fivetran_conn = fivetran_conn + self.timeout_seconds = timeout_seconds + if retry_limit < 1: + raise ValueError("Retry limit must be greater than equal to 1") + self.retry_limit = retry_limit + self.retry_delay = retry_delay + + def _do_api_call(self, endpoint_info, json=None): + """ + Utility function to perform an API call with retries + + :param endpoint_info: Tuple of method and endpoint + :type endpoint_info: tuple[string, string] + :param json: Parameters for this API call. + :type json: dict + :return: If the api call returns a OK status code, + this function returns the response in JSON. Otherwise, + we throw an AirflowException. + :rtype: dict + """ + method, endpoint = endpoint_info + if self.fivetran_conn is None: + self.fivetran_conn = self.get_connection(self.conn_id) + auth = (self.fivetran_conn.login, self.fivetran_conn.password) + url = f"{self.api_protocol}://{self.api_host}/{endpoint}" + + headers = {"User-Agent": self.api_user_agent + self._get_airflow_version()} + + if method == "GET": + request_func = requests.get + elif method == "POST": + request_func = requests.post + headers.update({"Content-Type": "application/json;version=2"}) + elif method == "PATCH": + request_func = requests.patch + headers.update({"Content-Type": "application/json;version=2"}) + else: + raise AirflowException("Unexpected HTTP Method: " + method) + + attempt_num = 1 + while True: + try: + response = request_func( + url, + data=json if method in ("POST", "PATCH") else None, + params=json if method in ("GET") else None, + auth=auth, + headers=headers, + ) + response.raise_for_status() + return response.json() + except requests_exceptions.RequestException as e: + if not _retryable_error(e): + # In this case, the user probably made a mistake. + # Don't retry. + raise AirflowException( + f"Response: {e.response.content}, " f"Status Code: {e.response.status_code}" + ) + + self._log_request_error(attempt_num, e) + + if attempt_num == self.retry_limit: + raise AirflowException( + f"API request to Fivetran failed {self.retry_limit} times." " Giving up." + ) + + attempt_num += 1 + sleep(self.retry_delay) + + def _log_request_error(self, attempt_num: int, error: str) -> None: + self.log.error( + "Attempt %s API Request to Fivetran failed with reason: %s", + attempt_num, + error, + ) + + def _connector_ui_url(self, service_name, schema_name): + return f"https://fivetran.com/dashboard/connectors/" f"{service_name}/{schema_name}" + + def _connector_ui_url_logs(self, service_name, schema_name): + return self._connector_ui_url(service_name, schema_name) + "/logs" + + def _connector_ui_url_setup(self, service_name, schema_name): + return self._connector_ui_url(service_name, schema_name) + "/setup" + + def get_connector(self, connector_id) -> dict: + """ + Fetches the detail of a connector. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :return: connector details + :rtype: Dict + """ + if connector_id == "": + raise ValueError("No value specified for connector_id") + endpoint = self.api_path_connectors + connector_id + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_connector_schemas(self, connector_id) -> dict: + """ + Fetches schema information of the connector. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :return: schema details + :rtype: Dict + """ + if connector_id == "": + raise ValueError("No value specified for connector_id") + endpoint = self.api_path_connectors + connector_id + "/schemas" + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_metadata(self, connector_id, metadata) -> dict: + """ + Fetches metadata for a given metadata string and connector. + + The Fivetran metadata API is currently in beta and available to + all Fivetran users on the enterprise plan and above. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param metadata: The string to return the type of metadata from the API + :type metadata: str + :return: table or column metadata details + :rtype: Dict + """ + metadata_values = ("tables", "columns") + if connector_id == "": + raise ValueError("No value specified for connector_id") + if metadata not in metadata_values: + raise ValueError(f"Got {metadata} for param 'metadata', expected one" f" of: {metadata_values}") + endpoint = self.api_metadata_path_connectors + connector_id + "/" + metadata + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_destinations(self, group_id) -> dict: + """ + Fetches destination information for the given group. + :param group_id: The Fivetran group ID, returned by a connector API call. + :type group_id: str + :return: destination details + :rtype: Dict + """ + if group_id == "": + raise ValueError("No value specified for group_id") + endpoint = self.api_path_destinations + group_id + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def get_groups(self, group_id) -> dict: + """ + Fetches destination information for the given group. + :param group_id: The Fivetran group ID, returned by a connector API call. + :type group_id: str + :return: group details + :rtype: Dict + """ + if group_id == "": + raise ValueError("No value specified for connector_id") + endpoint = self.api_path_groups + group_id + resp = self._do_api_call(("GET", endpoint)) + return resp["data"] + + def check_connector(self, connector_id): + """ + Ensures connector configuration has been completed successfully and is in + a functional state. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + """ + connector_details = self.get_connector(connector_id) + service_name = connector_details["service"] + schema_name = connector_details["schema"] + setup_state = connector_details["status"]["setup_state"] + if setup_state != "connected": + raise AirflowException( + f'Fivetran connector "{connector_id}" not correctly configured, ' + f"status: {setup_state}\nPlease see: " + f"{self._connector_ui_url_setup(service_name, schema_name)}" + ) + self.log.info("Connector type: %s, connector schema: %s", service_name, schema_name) + self.log.info("Connectors logs at %s", self._connector_ui_url_logs(service_name, schema_name)) + return True + + def set_schedule_type(self, connector_id, schedule_type): + """ + Set connector sync mode to switch sync control between API and UI. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param schedule_type: "manual" (schedule controlled via Airlow) or + "auto" (schedule controlled via Fivetran) + :type schedule_type: str + """ + endpoint = self.api_path_connectors + connector_id + return self._do_api_call(("PATCH", endpoint), json.dumps({"schedule_type": schedule_type})) + + def prep_connector(self, connector_id, schedule_type): + """ + Prepare the connector to run in Airflow by checking that it exists and is a good state, + then update connector sync schedule type if changed. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param schedule_type: Fivetran connector schedule type + :type schedule_type: str + """ + self.check_connector(connector_id) + if schedule_type not in {"manual", "auto"}: + raise ValueError('schedule_type must be either "manual" or "auto"') + if self.get_connector(connector_id)["schedule_type"] != schedule_type: + return self.set_schedule_type(connector_id, schedule_type) + return True + + def start_fivetran_sync(self, connector_id): + """ + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :return: Timestamp of previously completed sync + :rtype: str + """ + connector_details = self.get_connector(connector_id) + succeeded_at = connector_details["succeeded_at"] + failed_at = connector_details["failed_at"] + endpoint = self.api_path_connectors + connector_id + if self._do_api_call(("GET", endpoint))["data"]["paused"] is True: + self._do_api_call(("PATCH", endpoint), json.dumps({"paused": False})) + if succeeded_at is None and failed_at is None: + succeeded_at = str(pendulum.now()) + self._do_api_call(("POST", endpoint + "/force")) + + failed_at_time = None + try: + failed_at_time = self._parse_timestamp(failed_at) + except Exception: + self.log.error("Pendulum.parsing.exception occured") + + last_sync = ( + succeeded_at + if failed_at_time is None + or self._parse_timestamp(succeeded_at) > self._parse_timestamp(failed_at) + else failed_at + ) + return last_sync + + def get_last_sync(self, connector_id, xcom=""): + """ + Get the last time Fivetran connector completed a sync. + Used with FivetranSensor to monitor sync completion status. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param xcom: Timestamp as string pull from FivetranOperator via XCOM + :type xcom: str + :return: Timestamp of last completed sync + :rtype: Pendulum.DateTime + """ + if xcom: + last_sync = self._parse_timestamp(xcom) + else: + connector_details = self.get_connector(connector_id) + succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) + failed_at = self._parse_timestamp(connector_details["failed_at"]) + last_sync = succeeded_at if succeeded_at > failed_at else failed_at + return last_sync + + def get_sync_status(self, connector_id, previous_completed_at, reschedule_time=0): + """ + For sensor, return True if connector's 'succeeded_at' field has updated. + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param previous_completed_at: The last time the connector ran, collected on Sensor + initialization. + :type previous_completed_at: pendulum.datetime.DateTime + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int + """ + # @todo Need logic here to tell if the sync is not running at all and not + # likely to run in the near future. + connector_details = self.get_connector(connector_id) + succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) + failed_at = self._parse_timestamp(connector_details["failed_at"]) + current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at + + # The only way to tell if a sync failed is to check if its latest + # failed_at value is greater than then last known "sync completed at" value. + if failed_at > previous_completed_at: + service_name = connector_details["service"] + schema_name = connector_details["schema"] + raise AirflowException( + f'Fivetran sync for connector "{connector_id}" failed; ' + f"please see logs at " + f"{self._connector_ui_url_logs(service_name, schema_name)}" + ) + + sync_state = connector_details["status"]["sync_state"] + self.log.info("Connector %s: sync_state = %s", connector_id, sync_state) + + # if sync in resheduled start, wait for time recommended by Fivetran + # or manually specified, then restart sync + if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual": + self.log.info('Connector is in "rescheduled" state and needs to be manually restarted') + self.pause_and_restart( + connector_id, connector_details["status"]["rescheduled_for"], reschedule_time + ) + return False + + # Check if sync started by FivetranOperator has finished + # indicated by new 'succeeded_at' timestamp + if current_completed_at > previous_completed_at: + self.log.info("Connector %s: succeeded_at: %s, connector_id", succeeded_at.to_iso8601_string()) + return True + else: + return False + + def pause_and_restart(self, connector_id, reschedule_for, reschedule_time): + """ + While a connector is syncing, if it falls into a reschedule state, + wait for a time either specified by the user of recommended by Fivetran, + Then restart a sync + + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param reschedule_for: From connector details, if schedule_type is manual, + then the connector expects triggering the event at the designated UTC time + :type reschedule_for: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int + """ + if reschedule_time: + self.log.info("Starting connector again in %s seconds", reschedule_time) + time.sleep(reschedule_time) + else: + wait_time = ( + self._parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC") + ).seconds + self.log.info("Starting connector again in %s seconds", wait_time) + time.sleep(wait_time) + + self.log.info("Restarting connector now") + return self.start_fivetran_sync(connector_id) + + def _parse_timestamp(self, api_time): + """ + Returns either the pendulum-parsed actual timestamp or + a very out-of-date timestamp if not set + + :param api_time: timestamp format as returned by the Fivetran API. + :type api_time: str + :rtype: Pendulum.DateTime + """ + return pendulum.parse(api_time) if api_time is not None else pendulum.from_timestamp(-1) + + def test_connection(self): + """ + Ensures Airflow can reach Fivetran API + """ + try: + resp = self._do_api_call(("GET", "v1/users")) + if resp["code"] == "Success": + return True, "Fivetran connection test passed" + else: + return False, resp + except Exception as e: + return False, str(e) class FivetranHookAsync(FivetranHook): @@ -222,3 +681,14 @@ async def get_last_sync_async(self, connector_id, xcom=""): def _retryable_error_async(exception: ClientResponseError) -> bool: return exception.status >= 500 + + +def _retryable_error(exception) -> bool: + return ( + isinstance( + exception, + (requests_exceptions.ConnectionError, requests_exceptions.Timeout), + ) + or exception.response is not None + and exception.response.status_code >= 500 + ) diff --git a/fivetran_provider_async/operators.py b/fivetran_provider_async/operators.py index eecffd8..21c51d5 100644 --- a/fivetran_provider_async/operators.py +++ b/fivetran_provider_async/operators.py @@ -1,13 +1,92 @@ from typing import Any, Dict, Optional from airflow.exceptions import AirflowException +from airflow.models import BaseOperator, BaseOperatorLink from airflow.utils.context import Context -from fivetran_provider.operators.fivetran import FivetranOperator +from airflow.utils.decorators import apply_defaults +from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger from fivetran_provider_async.utils.operator_utils import datasets +class RegistryLink(BaseOperatorLink): + """Link to Registry""" + + name = "Astronomer Registry" + + def get_link(self, operator, dttm): + """Get link to registry page.""" + + registry_link = "https://registry.astronomer.io/providers/{provider}/modules/{operator}" + return registry_link.format(provider="fivetran", operator="fivetranoperator") + + +class FivetranOperator(BaseOperator): + """ + `FivetranOperator` starts a Fivetran sync job. + + `FivetranOperator` requires that you specify the `connector_id` of the sync job to + start. You can find `connector_id` in the Settings page of the connector you + configured in the `Fivetran dashboard `_. + Note that when a Fivetran sync job is controlled via an Operator, it is no longer + run on the schedule as managed by Fivetran. In other words, it is now scheduled only + from Airflow. This can be changed with the schedule_type parameter. + + :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure + the hook. + :type fivetran_conn_id: Optional[str] + :param fivetran_retry_limit: # of retries when encountering API errors + :type fivetran_retry_limit: Optional[int] + :param fivetran_retry_delay: Time to wait before retrying API request + :type fivetran_retry_delay: int + :param connector_id: ID of the Fivetran connector to sync, found on the + Connector settings page. + :type connector_id: str + :param schedule_type: schedule type. Default is "manual" which takes the connector off Fivetran schedule. + Set to "auto" to keep connector on Fivetran schedule. + :type schedule_type: str + """ + + operator_extra_links = (RegistryLink(),) + + # Define which fields get jinjaified + template_fields = ["connector_id"] + + @apply_defaults + def __init__( + self, + connector_id: str, + run_name: Optional[str] = None, + timeout_seconds: Optional[int] = None, + fivetran_conn_id: str = "fivetran", + fivetran_retry_limit: int = 3, + fivetran_retry_delay: int = 1, + poll_frequency: int = 15, + schedule_type: str = "manual", + **kwargs, + ): + super().__init__(**kwargs) + self.fivetran_conn_id = fivetran_conn_id + self.fivetran_retry_limit = fivetran_retry_limit + self.fivetran_retry_delay = fivetran_retry_delay + self.connector_id = connector_id + self.poll_frequency = poll_frequency + self.schedule_type = schedule_type + + def _get_hook(self) -> FivetranHook: + return FivetranHook( + self.fivetran_conn_id, + retry_limit=self.fivetran_retry_limit, + retry_delay=self.fivetran_retry_delay, + ) + + def execute(self, context): + hook = self._get_hook() + hook.prep_connector(self.connector_id, self.schedule_type) + return hook.start_fivetran_sync(self.connector_id) + + class FivetranOperatorAsync(FivetranOperator): """ `FivetranOperatorAsync` submits a Fivetran sync job , and polls for its status on the diff --git a/fivetran_provider_async/sensors.py b/fivetran_provider_async/sensors.py index 7585764..fe11876 100644 --- a/fivetran_provider_async/sensors.py +++ b/fivetran_provider_async/sensors.py @@ -1,13 +1,93 @@ from typing import Any, Dict, Optional from airflow.exceptions import AirflowException +from airflow.sensors.base import BaseSensorOperator from airflow.utils.context import Context from airflow.utils.decorators import apply_defaults -from fivetran_provider.sensors.fivetran import FivetranSensor +from fivetran_provider_async.hooks import FivetranHook from fivetran_provider_async.triggers import FivetranTrigger +class FivetranSensor(BaseSensorOperator): + """ + `FivetranSensor` monitors a Fivetran sync job for completion. + + Monitoring with `FivetranSensor` allows you to trigger downstream processes only + when the Fivetran sync jobs have completed, ensuring data consistency. You can + use multiple instances of `FivetranSensor` to monitor multiple Fivetran + connectors. Note, it is possible to monitor a sync that is scheduled and managed + from Fivetran; in other words, you can use `FivetranSensor` without using + `FivetranOperator`. If used in this way, your DAG will wait until the sync job + starts on its Fivetran-controlled schedule and then completes. `FivetranSensor` + requires that you specify the `connector_id` of the sync job to start. You can + find `connector_id` in the Settings page of the connector you configured in the + `Fivetran dashboard `_. + + + :param fivetran_conn_id: `Conn ID` of the Connection to be used to configure + the hook. + :type fivetran_conn_id: str + :param connector_id: ID of the Fivetran connector to sync, found on the + Connector settings page in the Fivetran Dashboard. + :type connector_id: str + :param poke_interval: Time in seconds that the job should wait in + between each tries + :type poke_interval: int + :param fivetran_retry_limit: # of retries when encountering API errors + :type fivetran_retry_limit: Optional[int] + :param fivetran_retry_delay: Time to wait before retrying API request + :type fivetran_retry_delay: int + :param xcom: If used, FivetranSensor receives timestamp of previously + completed sync from FivetranOperator via XCOM + :type xcom: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int + """ + + # Define which fields get jinjaified + template_fields = ["connector_id", "xcom"] + + @apply_defaults + def __init__( + self, + connector_id: str, + fivetran_conn_id: str = "fivetran", + poke_interval: int = 60, + fivetran_retry_limit: int = 3, + fivetran_retry_delay: int = 1, + xcom: str = "", + reschedule_time: int = 0, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.fivetran_conn_id = fivetran_conn_id + self.connector_id = connector_id + self.poke_interval = poke_interval + self.previous_completed_at = None + self.fivetran_retry_limit = fivetran_retry_limit + self.fivetran_retry_delay = fivetran_retry_delay + self.hook = None + self.xcom = xcom + self.reschedule_time = reschedule_time + + def _get_hook(self) -> FivetranHook: + if self.hook is None: + self.hook = FivetranHook( + self.fivetran_conn_id, + retry_limit=self.fivetran_retry_limit, + retry_delay=self.fivetran_retry_delay, + ) + return self.hook + + def poke(self, context): + hook = self._get_hook() + if self.previous_completed_at is None: + self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) + return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time) + + class FivetranSensorAsync(FivetranSensor): """ `FivetranSensorAsync` asynchronously monitors a Fivetran sync job for completion. diff --git a/setup.cfg b/setup.cfg index 8241758..68b9297 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,7 +33,7 @@ install_requires = apache-airflow>=2.2.0 aiohttp asgiref - airflow-provider-fivetran + requests [options.extras_require] diff --git a/tests/hooks/test_fivetran.py b/tests/hooks/test_fivetran.py index 89a22d2..c1c8b4c 100644 --- a/tests/hooks/test_fivetran.py +++ b/tests/hooks/test_fivetran.py @@ -1,12 +1,14 @@ +import unittest from unittest import mock import multidict import pendulum import pytest +import requests_mock from aiohttp import ClientResponseError, RequestInfo from airflow.exceptions import AirflowException -from fivetran_provider_async.hooks import FivetranHookAsync +from fivetran_provider_async.hooks import FivetranHook, FivetranHookAsync from tests.common.static import ( LOGIN, MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS, @@ -15,339 +17,561 @@ PASSWORD, ) - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_connector_async(mock_api_call_async_response): - """Tests that the get_connector_async method fetches the details of a connector""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - result = await hook.get_connector_async(connector_id="interchangeable_revenge") - assert result["status"]["setup_state"] == "connected" - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_connector_async_error(mock_api_call_async_response): - """Tests that the get_connector_async method raises exception when connector_id is not specified""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - with pytest.raises(ValueError) as exc: - await hook.get_connector_async(connector_id="") - assert str(exc.value) == "No value specified for connector_id" - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "mock_previous_completed_at, expected_result", - [ - ( - pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at - "success", - ), - ( - pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at - "pending", - ), - ], -) -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_sync_status_async( - mock_api_call_async_response, mock_previous_completed_at, expected_result -): - """Tests that get_sync_status_async method return success or pending depending on whether - current_completed_at > previous_completed_at""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - result = await hook.get_sync_status_async( - connector_id="interchangeable_revenge", - previous_completed_at=mock_previous_completed_at, - reschedule_wait_time=60, +MOCK_FIVETRAN_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "interchangeable_revenge", + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "null", + "paused": False, + "pause_after_trial": False, + "sync_frequency": 360, + "schedule_type": "manual", + "status": { + "setup_state": "connected", + "sync_state": "scheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + }, + "config": { + "latest_version": "1", + "sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...", + "named_range": "fivetran_test_range", + "authorization_method": "User OAuth", + "service_version": "1", + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + +MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "enable_new_by_default": True, + "schema_change_handling": "ALLOW_ALL", + "schemas": { + "google_sheets.fivetran_google_sheets_spotify": { + "name_in_destination": "google_sheets.fivetran_google_sheets_spotify", + "enabled": True, + "tables": { + "table_1": { + "name_in_destination": "table_1", + "enabled": True, + "sync_mode": "SOFT_DELETE", + "enabled_patch_settings": {"allowed": True}, + "columns": { + "column_1": { + "name_in_destination": "column_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason_code": "SYSTEM_COLUMN", + "reason": "The column does not support exclusion as it is a Primary Key", + }, + }, + }, + } + }, + } + }, + }, +} + +MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "NjgyMDM0OQ", + "parent_id": "ZGVtbw", + "name_in_source": "subscription_periods", + "name_in_destination": "subscription_periods", + } + ] + }, +} + +MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "items": [ + { + "id": "MjE0NDM2ODE2", + "parent_id": "NjgyMDM0OQ", + "name_in_source": "_file", + "name_in_destination": "_file", + "type_in_source": "String", + "type_in_destination": "VARCHAR(256)", + "is_primary_key": True, + "is_foreign_key": False, + }, + ] + }, +} + +MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "rarer_gradient", + "group_id": "rarer_gradient", + "service": "google_sheets", + "region": "GCP_US_EAST4", + "time_zone_offset": "-8", + "setup_status": "connected", + "config": {"schema": "google_sheets.fivetran_google_sheets_spotify"}, + }, +} + +MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "rarer_gradient", + "name": "GoogleSheets", + "created_at": "2022-12-12T17:14:33.790844Z", + }, +} + + +class TestFivetranHookAsync: + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_connector_async(self, mock_api_call_async_response): + """Tests that the get_connector_async method fetches the details of a connector""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + result = await hook.get_connector_async(connector_id="interchangeable_revenge") + assert result["status"]["setup_state"] == "connected" + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_connector_async_error(self, mock_api_call_async_response): + """Tests that the get_connector_async method raises exception when connector_id is not specified""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + with pytest.raises(ValueError) as exc: + await hook.get_connector_async(connector_id="") + assert str(exc.value) == "No value specified for connector_id" + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], ) - assert result == expected_result - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "mock_previous_completed_at, expected_result", - [ - ( - pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at - "success", - ), - ( - pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at - "pending", - ), - ], -) -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_error_for_wait_time( - mock_api_call_async_response, mock_previous_completed_at, expected_result -): - """Tests that get_sync_status_async method return error with rescheduled_for in Fivetran API response - along with schedule_type as manual and negative wait time.""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE - with pytest.raises(ValueError, match="Sync connector manually."): - await hook.get_sync_status_async( + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_sync_status_async( + self, mock_api_call_async_response, mock_previous_completed_at, expected_result + ): + """Tests that get_sync_status_async method return success or pending depending on whether + current_completed_at > previous_completed_at""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + result = await hook.get_sync_status_async( connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at, + reschedule_wait_time=60, ) - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "mock_previous_completed_at, expected_result", - [ - ( - pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at - "success", - ), - ( - pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at - "pending", - ), - ], -) -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") -async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode( - mock_start_fivetran_sync, mock_api_call_async_response, mock_previous_completed_at, expected_result -): - """Tests that get_sync_status_async method return success or pending depending on whether - current_completed_at > previous_completed_at with reschedule_time specified by user and - schedule_type as manual in API response.""" - mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE - result = await hook.get_sync_status_async( - connector_id="interchangeable_revenge", - previous_completed_at=mock_previous_completed_at, - reschedule_wait_time=10, + assert result == expected_result + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], ) - - assert result == expected_result - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "mock_previous_completed_at, expected_result", - [ - ( - pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at - "success", - ), - ( - pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at - "pending", - ), - ], -) -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") -async def test_fivetran_hook_get_sync_status_async_with_reschedule_for_and_schedule_type_manual( - mock_start_fivetran_sync, mock_api_call_async_response, mock_previous_completed_at, expected_result -): - """Tests that get_sync_status_async method return success or pending depending on whether - current_completed_at > previous_completed_at with reschedule_for in Fivetran API response - along with schedule_type as manual.""" - mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR - result = await hook.get_sync_status_async( - connector_id="interchangeable_revenge", - previous_completed_at=mock_previous_completed_at, + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode_error_for_wait_time( + self, mock_api_call_async_response, mock_previous_completed_at, expected_result + ): + """Tests that get_sync_status_async method return error with rescheduled_for in Fivetran API response + along with schedule_type as manual and negative wait time.""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE + with pytest.raises(ValueError, match="Sync connector manually."): + await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], ) - - assert result == expected_result - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_sync_status_async_exception(mock_api_call_async_response): - """Tests that get_sync_status_async method raises exception when failed_at > previous_completed_at""" - mock_previous_completed_at = pendulum.datetime(2021, 3, 21, 21, 55) - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - - with pytest.raises(AirflowException) as exc: - await hook.get_sync_status_async( - connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") + async def test_fivetran_hook_get_sync_status_async_with_reschedule_mode( + self, + mock_start_fivetran_sync, + mock_api_call_async_response, + mock_previous_completed_at, + expected_result, + ): + """Tests that get_sync_status_async method return success or pending depending on whether + current_completed_at > previous_completed_at with reschedule_time specified by user and + schedule_type as manual in API response.""" + mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_RESCHEDULE_MODE + result = await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + reschedule_wait_time=10, ) - assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value) - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_pause_and_restart(mock_api_call_async_response, mock_start_fivetran_sync): - """Tests that pause_and_restart method for manual mode with reschedule time set.""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_start_fivetran_sync.return_value = True - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - - result = hook.pause_and_restart( - connector_id="interchangeable_revenge", reschedule_for="manual", reschedule_wait_time=60 + assert result == expected_result + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mock_previous_completed_at, expected_result", + [ + ( + pendulum.datetime(2021, 3, 23), # current_completed_at > previous_completed_at + "success", + ), + ( + pendulum.datetime(2021, 3, 23, 21, 55), # current_completed_at < previous_completed_at + "pending", + ), + ], ) - assert result is True - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_last_sync_async_no_xcom(mock_api_call_async_response): - """Tests that the get_last_sync_async method returns the last time Fivetran connector - completed a sync""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - result = await hook.get_last_sync_async(connector_id="interchangeable_revenge") - assert result == pendulum.parse(MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"]) - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") -async def test_fivetran_hook_get_last_sync_async_with_xcom(mock_api_call_async_response): - """Tests that the get_last_sync_async method returns the last time Fivetran connector - completed a sync when xcom is passed""" - XCOM = "2021-03-22T20:55:12.670390Z" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS - result = await hook.get_last_sync_async(connector_id="interchangeable_revenge", xcom=XCOM) - assert result == pendulum.parse(XCOM) - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_get_method_with_success(mock_get_connection, mock_session): - """Tests that _do_api_call_async method returns correct response when GET request - is successful""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun - mock_session.return_value.__aenter__.return_value.get.return_value.json.return_value = { - "status": "success" - } - - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD - response = await hook._do_api_call_async(("GET", "v1/connectors/test")) - assert response == {"status": "success"} - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_patch_method_with_success(mock_get_connection, mock_session): - """Tests that _do_api_call_async method returns correct response when PATCH request - is successful""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun - mock_session.return_value.__aenter__.return_value.patch.return_value.json.return_value = { - "status": "success" - } - - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD - response = await hook._do_api_call_async(("PATCH", "v1/connectors/test")) - assert response == {"status": "success"} - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_post_method_with_success(mock_get_connection, mock_session): - """Tests that _do_api_call_async method returns correct response when POST request - is successful""" - - async def mock_fun(arg1, arg2, arg3, arg4): - return {"status": "success"} - - mock_session.return_value.__aexit__.return_value = mock_fun - mock_session.return_value.__aenter__.return_value.post.return_value.json.return_value = { - "status": "success" - } - - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD - response = await hook._do_api_call_async(("POST", "v1/connectors/test")) - assert response == {"status": "success"} - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_unexpected_method_error(mock_get_connection, mock_session): - """Tests that _do_api_call_async method raises exception when a wrong request is sent""" - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD - with pytest.raises(AirflowException) as exc: - await hook._do_api_call_async(("UNKNOWN", "v1/connectors/test")) - assert str(exc.value) == "Unexpected HTTP Method: UNKNOWN" - - -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_with_non_retryable_client_response_error(mock_get_connection, mock_session): - """Tests that _do_api_call_async method returns expected response for a non retryable error""" - mock_session.return_value.__aenter__.return_value.patch.return_value.json.side_effect = ( - ClientResponseError( - request_info=RequestInfo(url="example.com", method="PATCH", headers=multidict.CIMultiDict()), - status=400, - message="test message", - history=[], + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") + async def test_fivetran_hook_get_sync_status_async_with_reschedule_for_and_schedule_type_manual( + self, + mock_start_fivetran_sync, + mock_api_call_async_response, + mock_previous_completed_at, + expected_result, + ): + """Tests that get_sync_status_async method return success or pending depending on whether + current_completed_at > previous_completed_at with reschedule_for in Fivetran API response + along with schedule_type as manual.""" + mock_start_fivetran_sync.return_value = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS_WITH_RESCHEDULE_FOR + result = await hook.get_sync_status_async( + connector_id="interchangeable_revenge", + previous_completed_at=mock_previous_completed_at, + ) + + assert result == expected_result + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_sync_status_async_exception(self, mock_api_call_async_response): + """Tests that get_sync_status_async method raises exception when failed_at > previous_completed_at""" + mock_previous_completed_at = pendulum.datetime(2021, 3, 21, 21, 55) + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + + with pytest.raises(AirflowException) as exc: + await hook.get_sync_status_async( + connector_id="interchangeable_revenge", previous_completed_at=mock_previous_completed_at + ) + assert "Fivetran sync for connector interchangeable_revenge failed" in str(exc.value) + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.start_fivetran_sync") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_pause_and_restart( + self, mock_api_call_async_response, mock_start_fivetran_sync + ): + """Tests that pause_and_restart method for manual mode with reschedule time set.""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_start_fivetran_sync.return_value = True + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + + result = hook.pause_and_restart( + connector_id="interchangeable_revenge", reschedule_for="manual", reschedule_wait_time=60 + ) + assert result is True + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_last_sync_async_no_xcom(self, mock_api_call_async_response): + """Tests that the get_last_sync_async method returns the last time Fivetran connector + completed a sync""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + result = await hook.get_last_sync_async(connector_id="interchangeable_revenge") + assert result == pendulum.parse(MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS["data"]["succeeded_at"]) + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync._do_api_call_async") + async def test_fivetran_hook_get_last_sync_async_with_xcom(self, mock_api_call_async_response): + """Tests that the get_last_sync_async method returns the last time Fivetran connector + completed a sync when xcom is passed""" + XCOM = "2021-03-22T20:55:12.670390Z" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + mock_api_call_async_response.return_value = MOCK_FIVETRAN_RESPONSE_PAYLOAD_SHEETS + result = await hook.get_last_sync_async(connector_id="interchangeable_revenge", xcom=XCOM) + assert result == pendulum.parse(XCOM) + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_get_method_with_success(self, mock_get_connection, mock_session): + """Tests that _do_api_call_async method returns correct response when GET request + is successful""" + + async def mock_fun(arg1, arg2, arg3, arg4): + return {"status": "success"} + + mock_session.return_value.__aexit__.return_value = mock_fun + mock_session.return_value.__aenter__.return_value.get.return_value.json.return_value = { + "status": "success" + } + + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + response = await hook._do_api_call_async(("GET", "v1/connectors/test")) + assert response == {"status": "success"} + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_patch_method_with_success(self, mock_get_connection, mock_session): + """Tests that _do_api_call_async method returns correct response when PATCH request + is successful""" + + async def mock_fun(arg1, arg2, arg3, arg4): + return {"status": "success"} + + mock_session.return_value.__aexit__.return_value = mock_fun + mock_session.return_value.__aenter__.return_value.patch.return_value.json.return_value = { + "status": "success" + } + + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + response = await hook._do_api_call_async(("PATCH", "v1/connectors/test")) + assert response == {"status": "success"} + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_post_method_with_success(self, mock_get_connection, mock_session): + """Tests that _do_api_call_async method returns correct response when POST request + is successful""" + + async def mock_fun(arg1, arg2, arg3, arg4): + return {"status": "success"} + + mock_session.return_value.__aexit__.return_value = mock_fun + mock_session.return_value.__aenter__.return_value.post.return_value.json.return_value = { + "status": "success" + } + + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + response = await hook._do_api_call_async(("POST", "v1/connectors/test")) + assert response == {"status": "success"} + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_unexpected_method_error(self, mock_get_connection, mock_session): + """Tests that _do_api_call_async method raises exception when a wrong request is sent""" + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + with pytest.raises(AirflowException) as exc: + await hook._do_api_call_async(("UNKNOWN", "v1/connectors/test")) + assert str(exc.value) == "Unexpected HTTP Method: UNKNOWN" + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_with_non_retryable_client_response_error( + self, mock_get_connection, mock_session + ): + """Tests that _do_api_call_async method returns expected response for a non retryable error""" + mock_session.return_value.__aenter__.return_value.patch.return_value.json.side_effect = ( + ClientResponseError( + request_info=RequestInfo(url="example.com", method="PATCH", headers=multidict.CIMultiDict()), + status=400, + message="test message", + history=[], + ) ) - ) - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") + + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + + resp = await hook._do_api_call_async(("PATCH", "v1/connectors/test")) + assert resp == {"Response": {"test message"}, "Status Code": {400}} + + @pytest.mark.asyncio + @mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") + @mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") + async def test_do_api_call_async_with_retryable_client_response_error( + self, mock_get_connection, mock_session + ): + """Tests that _do_api_call_async method raises exception for a retryable error""" + mock_session.return_value.__aenter__.return_value.patch.return_value.json.side_effect = ( + ClientResponseError( + request_info=RequestInfo(url="example.com", method="PATCH", headers=multidict.CIMultiDict()), + status=500, + message="test message", + history=[], + ) + ) - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD + hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - resp = await hook._do_api_call_async(("PATCH", "v1/connectors/test")) - assert resp == {"Response": {"test message"}, "Status Code": {400}} + hook.fivetran_conn = mock_get_connection + hook.fivetran_conn.login = LOGIN + hook.fivetran_conn.password = PASSWORD + with pytest.raises(AirflowException) as exc: + await hook._do_api_call_async(("PATCH", "v1/connectors/test")) -@pytest.mark.asyncio -@mock.patch("fivetran_provider_async.hooks.aiohttp.ClientSession") -@mock.patch("fivetran_provider_async.hooks.FivetranHookAsync.get_connection") -async def test_do_api_call_async_with_retryable_client_response_error(mock_get_connection, mock_session): - """Tests that _do_api_call_async method raises exception for a retryable error""" - mock_session.return_value.__aenter__.return_value.patch.return_value.json.side_effect = ( - ClientResponseError( - request_info=RequestInfo(url="example.com", method="PATCH", headers=multidict.CIMultiDict()), - status=500, - message="test message", - history=[], - ) - ) + assert str(exc.value) == "API requests to Fivetran failed 3 times. Giving up." - hook = FivetranHookAsync(fivetran_conn_id="conn_fivetran") - hook.fivetran_conn = mock_get_connection - hook.fivetran_conn.login = LOGIN - hook.fivetran_conn.password = PASSWORD +# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") +class TestFivetranHook(unittest.TestCase): + """ + Test functions for Fivetran Hook. - with pytest.raises(AirflowException) as exc: - await hook._do_api_call_async(("PATCH", "v1/connectors/test")) + Mocks responses from Fivetran API. + """ - assert str(exc.value) == "API requests to Fivetran failed 3 times. Giving up." + @requests_mock.mock() + def test_get_connector(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_connector(connector_id="interchangeable_revenge") + assert result["status"]["setup_state"] == "connected" + + @requests_mock.mock() + def test_get_connector_schemas(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/schemas", + json=MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_connector_schemas(connector_id="interchangeable_revenge") + assert result["schemas"]["google_sheets.fivetran_google_sheets_spotify"]["enabled"] + + @requests_mock.mock() + def test_get_metadata_tables(self, m): + m.get( + "https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/tables", + json=MOCK_FIVETRAN_METADATA_TABLES_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_metadata(connector_id="interchangeable_revenge", metadata="tables") + assert result["items"][0]["id"] == "NjgyMDM0OQ" + + @requests_mock.mock() + def test_get_metadata_columns(self, m): + m.get( + "https://api.fivetran.com/v1/metadata/connectors/interchangeable_revenge/columns", + json=MOCK_FIVETRAN_METADATA_COLUMNS_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_metadata(connector_id="interchangeable_revenge", metadata="columns") + assert result["items"][0]["id"] == "MjE0NDM2ODE2" + + @requests_mock.mock() + def test_get_destinations(self, m): + m.get( + "https://api.fivetran.com/v1/destinations/rarer_gradient", + json=MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_destinations(group_id="rarer_gradient") + assert result["service"] == "google_sheets" + + @requests_mock.mock() + def test_get_groups(self, m): + m.get( + "https://api.fivetran.com/v1/groups/rarer_gradient", + json=MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.get_groups(group_id="rarer_gradient") + assert result["id"] == "rarer_gradient" + assert result["name"] == "GoogleSheets" + + @requests_mock.mock() + def test_start_fivetran_sync(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + hook = FivetranHook( + fivetran_conn_id="conn_fivetran", + ) + result = hook.start_fivetran_sync(connector_id="interchangeable_revenge") + assert result is not None diff --git a/tests/operators/test_fivetran.py b/tests/operators/test_fivetran.py index 537c456..ddba1f7 100644 --- a/tests/operators/test_fivetran.py +++ b/tests/operators/test_fivetran.py @@ -1,3 +1,4 @@ +import logging import unittest from unittest import mock @@ -5,7 +6,7 @@ import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.operators import FivetranOperatorAsync +from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync from tests.common.static import ( MOCK_FIVETRAN_DESTINATIONS_RESPONSE_PAYLOAD_SHEETS, MOCK_FIVETRAN_GROUPS_RESPONSE_PAYLOAD_SHEETS, @@ -15,6 +16,43 @@ MOCK_FIVETRAN_SCHEMA_RESPONSE_PAYLOAD_SHEETS, ) +log = logging.getLogger(__name__) + + +MOCK_FIVETRAN_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "interchangeable_revenge", + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "null", + "paused": False, + "sync_frequency": 360, + "schedule_type": "manual", + "status": { + "setup_state": "connected", + "sync_state": "scheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + }, + "config": { + "latest_version": "1", + "sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...", + "named_range": "fivetran_test_range", + "authorization_method": "User OAuth", + "service_version": "1", + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + @pytest.fixture def context(): @@ -26,7 +64,7 @@ def context(): @mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") -class TestFivetranOperator(unittest.TestCase): +class TestFivetranOperatorAsync(unittest.TestCase): @requests_mock.mock() def test_fivetran_op_async_execute_success(self, m): """Tests that task gets deferred after job submission""" @@ -147,3 +185,39 @@ def test_fivetran_operator_get_openlineage_facets_on_start(self, m): assert schema_field.name == "column_1_dest" assert schema_field.type == "VARCHAR(256)" assert schema_field.description is None + + +# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") +class TestFivetranOperator(unittest.TestCase): + """ + Test functions for Fivetran Operator. + + Mocks responses from Fivetran API. + """ + + @requests_mock.mock() + def test_fivetran_operator(self, m): + m.get( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.patch( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + m.post( + "https://api.fivetran.com/v1/connectors/interchangeable_revenge/force", + json=MOCK_FIVETRAN_RESPONSE_PAYLOAD, + ) + + operator = FivetranOperator( + task_id="fivetran-task", + fivetran_conn_id="conn_fivetran", + connector_id="interchangeable_revenge", + ) + + result = operator.execute({}) + log.info(result) + + assert result is not None diff --git a/tests/sensors/test_fivetran.py b/tests/sensors/test_fivetran.py index 64e14e2..74079e8 100644 --- a/tests/sensors/test_fivetran.py +++ b/tests/sensors/test_fivetran.py @@ -1,14 +1,52 @@ +import logging +import unittest from unittest import mock import pytest +import requests_mock from airflow.exceptions import AirflowException, TaskDeferred -from fivetran_provider_async.sensors import FivetranSensorAsync +from fivetran_provider_async.sensors import FivetranSensor, FivetranSensorAsync from fivetran_provider_async.triggers import FivetranTrigger TASK_ID = "fivetran_sensor_check" POLLING_PERIOD_SECONDS = 1.0 +log = logging.getLogger(__name__) + +MOCK_FIVETRAN_RESPONSE_PAYLOAD = { + "code": "Success", + "data": { + "id": "interchangeable_revenge", + "group_id": "rarer_gradient", + "service": "google_sheets", + "service_version": 1, + "schema": "google_sheets.fivetran_google_sheets_spotify", + "connected_by": "mournful_shalt", + "created_at": "2021-03-05T22:58:56.238875Z", + "succeeded_at": "2021-03-23T20:55:12.670390Z", + "failed_at": "null", + "sync_frequency": 360, + "schedule_type": "manual", + "status": { + "setup_state": "connected", + "sync_state": "scheduled", + "update_state": "on_schedule", + "is_historical_sync": False, + "tasks": [], + "warnings": [], + }, + "config": { + "latest_version": "1", + "sheet_id": "https://docs.google.com/spreadsheets/d/.../edit#gid=...", + "named_range": "fivetran_test_range", + "authorization_method": "User OAuth", + "service_version": "1", + "last_synced_changes__utc_": "2021-03-23 20:54", + }, + }, +} + @pytest.fixture def context(): @@ -19,60 +57,80 @@ def context(): yield context -def test_fivetran_sensor_async(): - """Asserts that a task is deferred and a FivetranTrigger will be fired - when the FivetranSensorAsync is executed.""" - task = FivetranSensorAsync( - task_id=TASK_ID, - fivetran_conn_id="fivetran_default", - connector_id="test_connector", - poke_interval=5, - ) - with pytest.raises(TaskDeferred) as exc: - task.execute(context) - assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" - - -def test_fivetran_sensor_async_with_response_wait_time(): - """Asserts that a task is deferred and a FivetranTrigger will be fired - when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" - task = FivetranSensorAsync( - task_id=TASK_ID, - fivetran_conn_id="fivetran_default", - connector_id="test_connector", - poke_interval=5, - reschedule_wait_time=60, - ) - with pytest.raises(TaskDeferred) as exc: - task.execute(context) - assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" - - -def test_fivetran_sensor_async_execute_failure(context): - """Tests that an AirflowException is raised in case of error event""" - task = FivetranSensorAsync( - task_id=TASK_ID, - fivetran_conn_id="fivetran_default", - connector_id="test_connector", - poke_interval=5, - ) - with pytest.raises(AirflowException) as exc: - task.execute_complete( - context=None, event={"status": "error", "message": "Fivetran connector sync failure"} +class TestFivetranSensorAsync: + def test_fivetran_sensor_async(self): + """Asserts that a task is deferred and a FivetranTrigger will be fired + when the FivetranSensorAsync is executed.""" + task = FivetranSensorAsync( + task_id=TASK_ID, + fivetran_conn_id="fivetran_default", + connector_id="test_connector", + poke_interval=5, + ) + with pytest.raises(TaskDeferred) as exc: + task.execute(context) + assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" + + def test_fivetran_sensor_async_with_response_wait_time(self): + """Asserts that a task is deferred and a FivetranTrigger will be fired + when the FivetranSensorAsync is executed when reschedule_wait_time is specified.""" + task = FivetranSensorAsync( + task_id=TASK_ID, + fivetran_conn_id="fivetran_default", + connector_id="test_connector", + poke_interval=5, + reschedule_wait_time=60, + ) + with pytest.raises(TaskDeferred) as exc: + task.execute(context) + assert isinstance(exc.value.trigger, FivetranTrigger), "Trigger is not a FivetranTrigger" + + def test_fivetran_sensor_async_execute_failure(self, context): + """Tests that an AirflowException is raised in case of error event""" + task = FivetranSensorAsync( + task_id=TASK_ID, + fivetran_conn_id="fivetran_default", + connector_id="test_connector", + poke_interval=5, ) - assert str(exc.value) == "error: Fivetran connector sync failure" - - -def test_fivetran_sensor_async_execute_complete(): - """Asserts that logging occurs as expected""" - task = FivetranSensorAsync( - task_id=TASK_ID, - fivetran_conn_id="fivetran_default", - connector_id="test_connector", - poke_interval=5, - ) - with mock.patch.object(task.log, "info") as mock_log_info: - task.execute_complete( - context=None, event={"status": "success", "message": "Fivetran connector finished syncing"} + with pytest.raises(AirflowException) as exc: + task.execute_complete( + context=None, event={"status": "error", "message": "Fivetran connector sync failure"} + ) + assert str(exc.value) == "error: Fivetran connector sync failure" + + def test_fivetran_sensor_async_execute_complete(self): + """Asserts that logging occurs as expected""" + task = FivetranSensorAsync( + task_id=TASK_ID, + fivetran_conn_id="fivetran_default", + connector_id="test_connector", + poke_interval=5, ) - mock_log_info.assert_called_with("Fivetran connector finished syncing") + with mock.patch.object(task.log, "info") as mock_log_info: + task.execute_complete( + context=None, event={"status": "success", "message": "Fivetran connector finished syncing"} + ) + mock_log_info.assert_called_with("Fivetran connector finished syncing") + + +# Mock the `conn_fivetran` Airflow connection (note the `@` after `API_SECRET`) +@mock.patch.dict("os.environ", AIRFLOW_CONN_CONN_FIVETRAN="http://API_KEY:API_SECRET@") +class TestFivetranSensor(unittest.TestCase): + """ + Test functions for Fivetran Operator. + + Mocks responses from Fivetran API. + """ + + @mock.patch.object(FivetranSensor, "poke", "returned_sync_status") + @requests_mock.mock() + def test_del(self, m): + sensor = FivetranSensor( + task_id="my_fivetran_sensor", + fivetran_conn_id="conn_fivetran", + connector_id="interchangeable_revenge", + ) + + log.info(sensor.poke) + assert sensor.poke == "returned_sync_status"