Skip to content

Commit

Permalink
Move fivetran sync provider code to this repo (#33)
Browse files Browse the repository at this point in the history
* Move fivetran sync provider code to this repo

Move code from https://github.com/fivetran/airflow-provider-fivetran to this repo
  • Loading branch information
pankajastro authored Jul 11, 2023
1 parent 7193365 commit 3294955
Show file tree
Hide file tree
Showing 13 changed files with 1,705 additions and 382 deletions.
34 changes: 34 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime, timedelta

from airflow import DAG

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
}

dag = DAG(
dag_id="example_fivetran",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
fivetran_sync_start = FivetranOperator(
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
)

fivetran_sync_start >> fivetran_sync_wait
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from datetime import datetime, timedelta

from airflow import DAG
from fivetran_provider.operators.fivetran import FivetranOperator

from fivetran_provider_async.operators import FivetranOperatorAsync
from fivetran_provider_async.operators import FivetranOperator, FivetranOperatorAsync
from fivetran_provider_async.sensors import FivetranSensorAsync

default_args = {
Expand Down
88 changes: 88 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryValueCheckOperator
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
from airflow.utils.dates import datetime
from fivetran_provider.operators.fivetran import FivetranOperator

from fivetran_provider_async.sensors import FivetranSensor

TABLE = "forestfires"
DATASET = "google_sheets"
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "astronomer",
"depends_on_past": False,
"start_date": datetime(2021, 7, 7),
"email": ["[email protected]"],
"email_on_failure": False,
}

with DAG(
"example_fivetran_bigquery",
default_args=default_args,
description="",
schedule_interval=None,
catchup=False,
) as dag:
"""
### Simple EL Pipeline with Data Integrity and Quality Checks
Before running the DAG, set the following in an Airflow or Environment Variables:
- key: gcp_project_id
value: [gcp_project_id]
- key: connector_id
value: [connector_id]
Fully replacing [gcp_project_id] & [connector_id] with the actual IDs.
What makes this a simple data quality case is:
1. Absolute ground truth: the local CSV file is considered perfect and immutable.
2. No transformations or business logic.
3. Exact values of data to quality check are known.
"""

"""
#### FivetranOperator & FivetranSensor
Calling Fivetran to begin data movement from Google Sheets to BigQuery
The FivetranSensor monitors the status of the Fivetran data sync
"""
fivetran_sync_start = FivetranOperator(
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

fivetran_sync_wait = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
)

"""
#### BigQuery row validation task
Ensure that data was copied to BigQuery correctly, i.e. the table and dataset
exist.
"""
validate_bigquery = BigQueryTableExistenceSensor(
task_id="validate_bigquery",
project_id="{{ var.value.gcp_project_id }}",
dataset_id=DATASET,
table_id="forestfires",
)

"""
#### Row-level data quality check
Run a data quality check on a few rows, ensuring that the data in BigQuery
matches the ground truth in the correspoding JSON file.
"""
check_bq_row_count = BigQueryValueCheckOperator(
task_id="check_row_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE}",
pass_value=516,
use_legacy_sql=False,
)

done = EmptyOperator(task_id="done")

fivetran_sync_start >> fivetran_sync_wait >> validate_bigquery
validate_bigquery >> check_bq_row_count >> done
127 changes: 127 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_bqml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
BigQueryGetDataOperator,
)
from airflow.providers.ssh.operators.ssh import SSHOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

# EDIT WITH YOUR PROJECT ID & DATASET NAME
PROJECT_ID = "YOUR PROJECT ID"
DATASET_NAME = "bqml"
DESTINATION_TABLE = "dbt_ads_bqml_preds"

TRAINING_QUERY = (
"CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model "
"OPTIONS "
"(model_type = 'ARIMA_PLUS', "
"time_series_timestamp_col = 'parsed_date', "
"time_series_data_col = 'daily_impressions', "
"auto_arima = TRUE, "
"data_frequency = 'AUTO_FREQUENCY', "
"decompose_time_series = TRUE "
") AS "
"SELECT "
"timestamp(date_day) as parsed_date, "
"SUM(impressions) as daily_impressions "
"FROM `" + PROJECT_ID + ".bqml.ad_reporting` "
"GROUP BY date_day;"
)

SERVING_QUERY = (
"SELECT string(forecast_timestamp) as forecast_timestamp, "
"forecast_value, "
"standard_error, "
"confidence_level, "
"prediction_interval_lower_bound, "
"prediction_interval_upper_bound, "
"confidence_interval_lower_bound, "
"confidence_interval_upper_bound "
"FROM ML.FORECAST(MODEL `"
+ PROJECT_ID
+ ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));"
)


def ml_branch(ds, **kwargs):
if "train" in kwargs["params"] and kwargs["params"]["train"]:
return "train_model"
else:
return "get_predictions"


default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
}

dag = DAG(
dag_id="example_fivetran_bqml",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
linkedin_sync = FivetranOperator(
task_id="linkedin-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=5,
)

twitter_sync = FivetranOperator(
task_id="twitter-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=5,
)

dbt_run = SSHOperator(
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)

ml_branch = BranchPythonOperator(task_id="ml_branch", python_callable=ml_branch, provide_context=True)

train_model = BigQueryExecuteQueryOperator(
task_id="train_model", sql=TRAINING_QUERY, use_legacy_sql=False
)

get_preds = BigQueryExecuteQueryOperator(
task_id="get_predictions",
sql=SERVING_QUERY,
use_legacy_sql=False,
destination_dataset_table=DATASET_NAME + "." + DESTINATION_TABLE,
write_disposition="WRITE_APPEND",
)

print_preds = BigQueryGetDataOperator(
task_id="print_predictions", dataset_id=DATASET_NAME, table_id=DESTINATION_TABLE
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor

[linkedin_sensor, twitter_sensor] >> dbt_run

dbt_run >> ml_branch >> [train_model, get_preds]
get_preds >> print_preds
50 changes: 50 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.ssh.operators.ssh import SSHOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
}

with DAG(
dag_id="ad_reporting_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
linkedin_sync = FivetranOperator(
task_id="linkedin-ads-sync",
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id="linkedin-sensor",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=600,
)

twitter_sync = FivetranOperator(
task_id="twitter-ads-sync",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id="twitter-sensor",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=600,
)

dbt_run = SSHOperator(
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor
[linkedin_sensor, twitter_sensor] >> dbt_run
40 changes: 40 additions & 0 deletions fivetran_provider_async/example_dags/example_fivetran_xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

from fivetran_provider_async.operators import FivetranOperator
from fivetran_provider_async.sensors import FivetranSensor

default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
"provide_context": True,
}

dag = DAG(
dag_id="example_fivetran_xcom",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
fivetran_operator = FivetranOperator(
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

delay_task = PythonOperator(task_id="delay_python_task", python_callable=lambda: time.sleep(60))

fivetran_sensor = FivetranSensor(
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
)

fivetran_operator >> delay_task >> fivetran_sensor
Loading

0 comments on commit 3294955

Please sign in to comment.