From b3df6d969f27b3f3e5df31142841906f79c39f7b Mon Sep 17 00:00:00 2001 From: chmnata Date: Tue, 29 Oct 2024 15:45:04 +0000 Subject: [PATCH 1/2] #1086switch alerts to use custom operator --- dags/bluetooth_check_readers_temp.py | 34 +++++++++++----------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/dags/bluetooth_check_readers_temp.py b/dags/bluetooth_check_readers_temp.py index 338e4cc07..6d9e52133 100644 --- a/dags/bluetooth_check_readers_temp.py +++ b/dags/bluetooth_check_readers_temp.py @@ -60,7 +60,7 @@ def blip_pipeline(): pipeline_check = SQLCheckOperatorWithReturnValue( task_id = 'pipeline_check', conn_id = 'bt_bot', - sql = '''SELECT * + sql = '''SELECT (COUNT(*) > 0) AS "_check" FROM bluetooth.aggr_5min WHERE datetime_bin >='{{ ds }}' and datetime_bin < '{{ tomorrow_ds }}' LIMIT 1''' @@ -85,26 +85,18 @@ def blip_pipeline(): ) # Send slack channel a msg when there are broken readers - @task - def broken_readers(ds, **context): - # Send slack channel a msg when there are broken readers. - bt_postgres = PostgresHook("bt_bot") - con = bt_postgres.get_conn() + broken_readers = SQLCheckOperatorWithReturnValue( + task_id="broken_readers", + sql=''' + SELECT + NOT(COUNT(*) > 0) AS "_check", + 'There are ' || COUNT(*) || ' cameras not reporting data as of yesterday. ' + || array_agg(read_id || ': ' || reader_name) AS msg + FROM bluetooth.broken_readers_temp('{{ ds}}') + ''', + conn_id="bt_bot" + ) - with con.cursor() as cursor: - sql_query = '''SELECT * from bluetooth.broken_readers_temp(%s::date)''' - cursor.execute(sql_query, (ds,)) - broken_readers = cursor.fetchall() - formatted_br = format_br_list(broken_readers) - - if len(broken_readers) == 0: - pass - else: - send_slack_msg( - context=context, - msg="The following bluetooth readers are not reporting data as of yesterday:", - attachments=formatted_br - ) ## Flow ## # Check blip data was aggregated as of yesterday then update routes table and reader status @@ -112,6 +104,6 @@ def broken_readers(ds, **context): pipeline_check >> [ update_routes_table, update_reader_status - ] >> broken_readers() + ] >> broken_readers blip_pipeline() \ No newline at end of file From 0fe86f1d66f4ef3f33e6145c763930bb828fec38 Mon Sep 17 00:00:00 2001 From: chmnata Date: Tue, 3 Dec 2024 15:47:20 +0000 Subject: [PATCH 2/2] #1086 Add message for no data alert --- dags/bluetooth_check_readers_temp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/bluetooth_check_readers_temp.py b/dags/bluetooth_check_readers_temp.py index 6d9e52133..384ed25c9 100644 --- a/dags/bluetooth_check_readers_temp.py +++ b/dags/bluetooth_check_readers_temp.py @@ -60,7 +60,8 @@ def blip_pipeline(): pipeline_check = SQLCheckOperatorWithReturnValue( task_id = 'pipeline_check', conn_id = 'bt_bot', - sql = '''SELECT (COUNT(*) > 0) AS "_check" + sql = '''SELECT (COUNT(*) > 0) AS "_check", + 'There are no data inserted for '|| '{{ ds }}' AS msg FROM bluetooth.aggr_5min WHERE datetime_bin >='{{ ds }}' and datetime_bin < '{{ tomorrow_ds }}' LIMIT 1'''