Skip to content

Commit

Permalink
Merge pull request #1089 from CityofToronto/1086-temp_bluetooth_check…
Browse files Browse the repository at this point in the history
…_readers-dag-failing-silently_1

#1086 switch alerts to use custom operator
  • Loading branch information
chmnata authored Dec 3, 2024
2 parents f6df12b + 0fe86f1 commit 1ab934c
Showing 1 changed file with 14 additions and 21 deletions.
35 changes: 14 additions & 21 deletions dags/bluetooth_check_readers_temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def blip_pipeline():
pipeline_check = SQLCheckOperatorWithReturnValue(
task_id = 'pipeline_check',
conn_id = 'bt_bot',
sql = '''SELECT *
sql = '''SELECT (COUNT(*) > 0) AS "_check",
'There are no data inserted for '|| '{{ ds }}' AS msg
FROM bluetooth.aggr_5min
WHERE datetime_bin >='{{ ds }}' and datetime_bin < '{{ tomorrow_ds }}'
LIMIT 1'''
Expand All @@ -85,33 +86,25 @@ def blip_pipeline():
)

# Send slack channel a msg when there are broken readers
@task
def broken_readers(ds, **context):
# Send slack channel a msg when there are broken readers.
bt_postgres = PostgresHook("bt_bot")
con = bt_postgres.get_conn()
broken_readers = SQLCheckOperatorWithReturnValue(
task_id="broken_readers",
sql='''
SELECT
NOT(COUNT(*) > 0) AS "_check",
'There are ' || COUNT(*) || ' cameras not reporting data as of yesterday. '
|| array_agg(read_id || ': ' || reader_name) AS msg
FROM bluetooth.broken_readers_temp('{{ ds}}')
''',
conn_id="bt_bot"
)

with con.cursor() as cursor:
sql_query = '''SELECT * from bluetooth.broken_readers_temp(%s::date)'''
cursor.execute(sql_query, (ds,))
broken_readers = cursor.fetchall()
formatted_br = format_br_list(broken_readers)

if len(broken_readers) == 0:
pass
else:
send_slack_msg(
context=context,
msg="The following bluetooth readers are not reporting data as of yesterday:",
attachments=formatted_br
)

## Flow ##
# Check blip data was aggregated as of yesterday then update routes table and reader status
# Lastly alert slack channel if there are broken readers
pipeline_check >> [
update_routes_table,
update_reader_status
] >> broken_readers()
] >> broken_readers

blip_pipeline()

0 comments on commit 1ab934c

Please sign in to comment.