diff --git a/dags/bluetooth_check_readers_temp.py b/dags/bluetooth_check_readers_temp.py index 338e4cc07..384ed25c9 100644 --- a/dags/bluetooth_check_readers_temp.py +++ b/dags/bluetooth_check_readers_temp.py @@ -60,7 +60,8 @@ def blip_pipeline(): pipeline_check = SQLCheckOperatorWithReturnValue( task_id = 'pipeline_check', conn_id = 'bt_bot', - sql = '''SELECT * + sql = '''SELECT (COUNT(*) > 0) AS "_check", + 'There are no data inserted for '|| '{{ ds }}' AS msg FROM bluetooth.aggr_5min WHERE datetime_bin >='{{ ds }}' and datetime_bin < '{{ tomorrow_ds }}' LIMIT 1''' @@ -85,26 +86,18 @@ def blip_pipeline(): ) # Send slack channel a msg when there are broken readers - @task - def broken_readers(ds, **context): - # Send slack channel a msg when there are broken readers. - bt_postgres = PostgresHook("bt_bot") - con = bt_postgres.get_conn() + broken_readers = SQLCheckOperatorWithReturnValue( + task_id="broken_readers", + sql=''' + SELECT + NOT(COUNT(*) > 0) AS "_check", + 'There are ' || COUNT(*) || ' cameras not reporting data as of yesterday. ' + || array_agg(read_id || ': ' || reader_name) AS msg + FROM bluetooth.broken_readers_temp('{{ ds}}') + ''', + conn_id="bt_bot" + ) - with con.cursor() as cursor: - sql_query = '''SELECT * from bluetooth.broken_readers_temp(%s::date)''' - cursor.execute(sql_query, (ds,)) - broken_readers = cursor.fetchall() - formatted_br = format_br_list(broken_readers) - - if len(broken_readers) == 0: - pass - else: - send_slack_msg( - context=context, - msg="The following bluetooth readers are not reporting data as of yesterday:", - attachments=formatted_br - ) ## Flow ## # Check blip data was aggregated as of yesterday then update routes table and reader status @@ -112,6 +105,6 @@ def broken_readers(ds, **context): pipeline_check >> [ update_routes_table, update_reader_status - ] >> broken_readers() + ] >> broken_readers blip_pipeline() \ No newline at end of file