From aeed63c2e1ab35fcc0ad39408c9816d55e2f9a9c Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 22 Mar 2024 22:16:16 +0000 Subject: [PATCH] #905 add a monthly dag --- dags/miovision_open_data.py | 83 +++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 dags/miovision_open_data.py diff --git a/dags/miovision_open_data.py b/dags/miovision_open_data.py new file mode 100644 index 000000000..8ba4f3522 --- /dev/null +++ b/dags/miovision_open_data.py @@ -0,0 +1,83 @@ +r"""### Monthly Miovision Open Data DAG +Pipeline to run monthly Miovision aggregations for Open Data. +""" +import sys +import os + +from airflow.decorators import dag, task +from datetime import timedelta +from airflow.models import Variable +from airflow.providers.postgres.operators.postgres import PostgresOperator +from airflow.macros import ds_format + +import logging +import pendulum + +try: + repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) + sys.path.insert(0, repo_path) + from dags.dag_functions import task_fail_slack_alert, send_slack_msg +except: + raise ImportError("Cannot import DAG helper functions.") + +LOGGER = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +DAG_NAME = 'miovision_open_data' +DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) + +default_args = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past':False, + #set earlier start_date + catchup when ready? + 'start_date': pendulum.datetime(2023, 12, 18, tz="America/Toronto"), + 'email_on_failure': False, + 'email_on_success': False, + 'retries': 0, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': task_fail_slack_alert +} + +@dag( + dag_id=DAG_NAME, + default_args=default_args, + schedule='0 14 3 * *', # 2pm, 3rd day of each month + catchup=False, + tags=["miovision", "open_data"], + doc_md=__doc__ +) +def miovision_open_data_dag(): + + #considered whether it should have an external task sensor + #for the first of the month. Decided it should run later + #to give time for anomalous_range updates if any. + + refresh_monthly_open_data = PostgresOperator( + task_id='refresh_monthly_open_data', + sql="SELECT gwolofs.insert_miovision_open_data_monthly_summary('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)", + postgres_conn_id='miovision_api_bot', + autocommit=True + ) + + refresh_15min_open_data = PostgresOperator( + task_id='refresh_15min_open_data', + sql="SELECT gwolofs.insert_miovision_15min_open_data('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)", + postgres_conn_id='miovision_api_bot', + autocommit=True + ) + + @task( + retries=0, + trigger_rule='all_done', + doc_md="""A status message to report DAG success.""" + ) + def status_message(ds = None, **context): + mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01') + send_slack_msg( + context=context, + msg=f":meow_miovision: :open_data_to: DAG ran successfully for {mnth} :white_check_mark:" + ) + + refresh_monthly_open_data >> refresh_15min_open_data >> status_message() + +miovision_open_data_dag()