From 5b9148d9150178f7c44928a7d9a60576a62b227f Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:48:52 +0000 Subject: [PATCH 1/9] #1007 new alerts api schema --- dags/miovision_pull.py | 22 +- volumes/miovision/api/pull_alert.py | 195 ++++++++---------- .../inserts/insert-miovision_alerts_new.sql | 21 ++ .../sql/table/create-table-alerts_new.sql | 43 ++++ 4 files changed, 159 insertions(+), 122 deletions(-) create mode 100644 volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql create mode 100644 volumes/miovision/sql/table/create-table-alerts_new.sql diff --git a/dags/miovision_pull.py b/dags/miovision_pull.py index 24e86bac0..7d5ac8165 100644 --- a/dags/miovision_pull.py +++ b/dags/miovision_pull.py @@ -29,7 +29,7 @@ pull_data, find_gaps, aggregate_15_min_mvt, aggregate_15_min, aggregate_volumes_daily, get_report_dates, get_intersection_info, agg_zero_volume_anomalous_ranges ) - from volumes.miovision.api.pull_alert import pull_alerts + from volumes.miovision.api.pull_alert import run_alerts_api except: raise ImportError("Cannot import DAG helper functions.") @@ -114,18 +114,12 @@ def pull_miovision(ds = None, **context): with mio_postgres.get_conn() as conn: pull_data(conn, start_time, end_time, INTERSECTION, key) - @task(task_id = 'pull_alerts', trigger_rule='none_failed', retries = 1) - def pull_alerts_task(ds): - CONFIG = configparser.ConfigParser() - CONFIG.read(API_CONFIG_PATH) - api_key=CONFIG['API'] - key=api_key['key'] - start_date = dateutil.parser.parse(str(ds)) - end_date = dateutil.parser.parse(str(ds_add(ds, 1))) - mio_postgres = PostgresHook("miovision_api_bot") - - with mio_postgres.get_conn() as conn: - pull_alerts(conn, start_date, end_date, key) + @task(trigger_rule='none_failed', retries = 1) + def pull_alerts(ds): + run_alerts_api( + start_date=ds, + end_date=ds_add(ds, 1) + ) @task_group(tooltip="Tasks to aggregate newly pulled Miovision data.") def miovision_agg(): @@ -266,7 +260,7 @@ def data_checks(): ( check_partitions() >> - [pull_miovision(), pull_alerts_task()] >> + [pull_miovision(), pull_alerts()] >> miovision_agg() >> t_done >> data_checks() diff --git a/volumes/miovision/api/pull_alert.py b/volumes/miovision/api/pull_alert.py index b25a30006..5dfbf41fb 100644 --- a/volumes/miovision/api/pull_alert.py +++ b/volumes/miovision/api/pull_alert.py @@ -1,12 +1,19 @@ +import os import datetime +import dateutil +import pytz import json from requests import Session import logging -import configparser import pandas as pd -from psycopg2 import connect +import numpy as np +import click +from psycopg2 import sql from psycopg2.extras import execute_values +from airflow.hooks.base_hook import BaseHook +from airflow.providers.postgres.hooks.postgres import PostgresHook + def logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -21,15 +28,37 @@ def logger(): session = Session() session.proxies = {} -url = 'https://api.miovision.com/alerts/' +URL_BASE = 'https://api.miovision.one/api/v1' +TZ = pytz.timezone("Canada/Eastern") +time_delta = datetime.timedelta(days=1) +default_start = str(datetime.date.today()-time_delta) +default_end = str(datetime.date.today()) + +SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql') + +CONTEXT_SETTINGS = dict( + default_map={'run_alerts_api_cli': {'flag': 0}} +) + +def run_alerts_api(start_date: str, end_date: str): + api_key = BaseHook.get_connection('miovision_api_key') + key = api_key.extra_dejson['key'] + mio_postgres = PostgresHook("miovision_api_bot") + start_date = dateutil.parser.parse(str(start_date)) + end_date = dateutil.parser.parse(str(end_date)) + with mio_postgres.get_conn() as conn: + pull_alerts(conn, start_date, end_date, key) + +@click.group(context_settings=CONTEXT_SETTINGS) +def cli(): + pass + +@cli.command() +@click.option('--start_date', default=default_start, help='format is YYYY-MM-DD for start date') +@click.option('--end_date' , default=default_end, help='format is YYYY-MM-DD for end date & excluding the day itself') -CONFIG = configparser.ConfigParser() -CONFIG.read('/etc/airflow/data_scripts/volumes/miovision/api/config.cfg') -api_key=CONFIG['API'] -key=api_key['key'] -dbset = CONFIG['DBSETTINGS'] -conn = connect(**dbset) -conn.autocommit = True +def run_alerts_api_cli(start_date, end_date): + return run_alerts_api(start_date, end_date) class MiovAlertPuller: """Miovision API puller. @@ -44,77 +73,46 @@ class MiovAlertPuller: key : str Miovision API access key. """ - headers = {'Content-Type': 'application/json', - 'Authorization': ''} - - def __init__(self, active_time, key): - self.active_time = active_time - self.headers['Authorization'] = key - + 'apikey': ''} + def __init__(self, url, start_time, end_time, key): + self.url = url + self.start_time = start_time.isoformat() + self.end_time = end_time.isoformat() + self.headers['apikey'] = key def get_response(self): """Requests data from API.""" - - params = {'activeAtTime': self.active_time} - + params = {'startDateTime': self.start_time, + 'endDateTime': self.end_time} response = session.get( - url, + url=self.url, params=params, - headers=self.headers, - proxies=session.proxies) - + headers=self.headers + ) # Return if we get a success response code, or raise an error if not. if response.status_code == 200: return response elif response.status_code == 401: raise Exception('Error' + str(response.status_code)) - + elif response.status_code == 404: + raise Exception('Error' + str(response.status_code)) + def process_timestamp(self, utc_timestamp): + if utc_timestamp is None: + return None + utc_timestamp = dateutil.parser.parse(str(utc_timestamp)) + #convert timestamp from utc to local TZ + local_timestamp = utc_timestamp.replace(tzinfo=pytz.utc).astimezone(TZ) + return local_timestamp.replace(tzinfo=None) def process_alert(self, row): """Process one row of Alert API output.""" - # Return time, classification_uid, leg, movement, volume. - return (self.active_time, row['intersectionId'], row['type']) - + start_timestamp = self.process_timestamp(row['alertStartDateTime']) + end_timestamp = self.process_timestamp(row['alertEndDateTime']) + return (row['alertId'], start_timestamp, end_timestamp, row['intersectionId'], row['type']) def process_response(self, response): """Process the output of self.get_response.""" data = json.loads(response.content.decode('utf-8')) - return [self.process_alert(row) for row in data] - -#insert data script -insert_data = """WITH new_values AS ( - SELECT intersection_id::text, alert::text, start_time::timestamp, end_time::timestamp - FROM (VALUES %s) AS new_values(intersection_id, alert, start_time, end_time) -), - ---extend existing alerts -updated AS ( - UPDATE miovision_api.alerts - SET end_time = new_values.end_time - FROM new_values - WHERE - alerts.intersection_id = new_values.intersection_id - AND alerts.alert = new_values.alert - --where old end = new start - AND alerts.end_time = new_values.start_time - --returns the new values used for updates (to be excluded from insert) - RETURNING new_values.* -) - ---insert new alerts (exclude updated) -INSERT INTO miovision_api.alerts (intersection_id, alert, start_time, end_time) -SELECT intersection_id, alert, start_time, end_time FROM new_values -EXCEPT -SELECT intersection_id, alert, start_time, end_time FROM updated -ON CONFLICT (intersection_id, alert, start_time) DO NOTHING; - ---update foreign key referencing miovision_api.intersections ---handles new records as well as old records with null intersection_uid (newly added intersections) -UPDATE miovision_api.alerts -SET intersection_uid = i.intersection_uid -FROM miovision_api.intersections AS i -WHERE alerts.intersection_id = i.id -AND alerts.intersection_uid IS NULL; -""" + return [self.process_alert(row) for row in data['alerts']], data['links']['next'] def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): """Miovision Alert Puller @@ -133,56 +131,37 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): key : str Miovision API access key. """ - - STEP_SIZE = datetime.timedelta(minutes=5) - - #establish list of timestamps to iterate over + start_date = TZ.localize(start_date) + end_date = TZ.localize(end_date) if end_date < start_date: raise ValueError('end_time is not greater than start_time.') - dt = start_date - dt_list = [dt, ] - while(dt < end_date): - dt = dt + STEP_SIZE - dt_list.append(dt) - logger.info('Pulling Miovision alerts from %s to %s.', start_date, end_date) - - #pull alerts at each timestamps and append to list + #pull alerts from each page and append to list dfs = [] - for dt in dt_list: - miovpull = MiovAlertPuller(dt, key) - test = miovpull.get_response() - response = miovpull.process_response(test) - df = pd.DataFrame(response).drop_duplicates() + pageSize = 100 + url = f"{URL_BASE}/alerts?pageSize={pageSize}&pageNumber=0" + while url is not None: + miovpull = MiovAlertPuller(url, start_date, end_date, key) + response = miovpull.get_response() + alerts, url = miovpull.process_response(response) + df = pd.DataFrame(alerts) dfs.append(df) - logger.info('Done pulling. Transforming alerts.') - #create pandas df and restructure final = pd.concat(dfs, ignore_index=True) - final.rename(columns={0: "time", 1: "intersection_id", 2: "alert"}, inplace = True) - final['time'] = pd.to_datetime(final['time']) - final.sort_values(by=['intersection_id', 'alert', 'time'], inplace = True) - final.reset_index(drop = True) - final.drop_duplicates(inplace = True, ignore_index = True) - - #group by alert and time - final['shifted'] = final.groupby(by = ['intersection_id', 'alert'])['time'].shift(1) - final['time_lag'] = final['time'] - final['shifted'] - final['start_time'] = final['time'] - - #iterate through and check if consecutive - for index, row in final.iterrows(): - if (row['time_lag'] == pd.Timedelta(f"{STEP_SIZE} minutes")): #lag = step size - final.at[index, 'start_time'] = final.at[index-1, 'start_time'] #assign previous start time - - #find start and end time of consecutive values - summary = final.groupby(by = ['intersection_id', 'alert', 'start_time']).agg({'time': ['max']}) - summary = summary.reset_index() - + final.rename(columns={0: "alertId", 1: "alertStartDateTime", 2: "alertEndDateTime", 3: "intersection_id", 4: "type"}, inplace = True) + final.replace({np.NaN: None}, inplace = True) #convert to tuples for inserting - values = list(summary.itertuples(index=False, name=None)) - - logger.info('Inserting values into `miovision_api.alerts`.') + values = list(final.itertuples(index=False, name=None)) + + #sql insert data script + fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts_new.sql') + with open(fpath, 'r', encoding='utf-8') as file: + insert_query = sql.SQL(file.read()) + + logger.info('Inserting values into `miovision_api.alerts_new`.') with conn.cursor() as cur: - execute_values(cur, insert_data, values) \ No newline at end of file + execute_values(cur, insert_query, values) + +if __name__ == '__main__': + cli() \ No newline at end of file diff --git a/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql new file mode 100644 index 000000000..32fd90c2f --- /dev/null +++ b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql @@ -0,0 +1,21 @@ +INSERT INTO miovision_api.alerts_new AS n ( + alert_id, start_time, end_time, intersection_id, alert +) +VALUES %s +ON CONFLICT (alert_id) +DO UPDATE +SET + intersection_id = EXCLUDED.intersection_id, + alert = EXCLUDED.alert, + start_time = EXCLUDED.start_time, + end_time = EXCLUDED.end_time +WHERE n.alert_id = EXCLUDED.alert_id; + +--update foreign key referencing miovision_api.intersections +--handles new records as well as old records with null intersection_uid (newly added intersections) +UPDATE miovision_api.alerts_new AS n +SET intersection_uid = i.intersection_uid +FROM miovision_api.intersections AS i +WHERE + n.intersection_id = i.id + AND n.intersection_uid IS NULL; \ No newline at end of file diff --git a/volumes/miovision/sql/table/create-table-alerts_new.sql b/volumes/miovision/sql/table/create-table-alerts_new.sql new file mode 100644 index 000000000..2a3a0959d --- /dev/null +++ b/volumes/miovision/sql/table/create-table-alerts_new.sql @@ -0,0 +1,43 @@ +-- Table: miovision_api.alerts_new + +-- DROP TABLE IF EXISTS miovision_api.alerts_new; + +CREATE TABLE IF NOT EXISTS miovision_api.alerts_new +( + alert_id text COLLATE pg_catalog."default" NOT NULL, + intersection_id text COLLATE pg_catalog."default" NOT NULL, + alert text COLLATE pg_catalog."default" NOT NULL, + start_time timestamp without time zone NOT NULL, + end_time timestamp without time zone, + intersection_uid integer, + CONSTRAINT miovision_alerts_pkey_new PRIMARY KEY (alert_id), + CONSTRAINT miov_alert_intersection_fkey_new FOREIGN KEY (intersection_uid) + REFERENCES miovision_api.intersections (intersection_uid) MATCH FULL + ON UPDATE NO ACTION + ON DELETE NO ACTION +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS miovision_api.alerts_new +OWNER TO miovision_admins; + +REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans; +REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot; + +GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans; + +GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins; + +GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot; + +COMMENT ON TABLE miovision_api.alerts_new IS E'' +'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. ' +'Note: a more detailed description is available on Miovision One.'; + +COMMENT ON COLUMN miovision_api.alerts_new.intersection_id +IS 'The intersection id, corresponding to intersections.intersection_id column'; + +COMMENT ON COLUMN miovision_api.alerts_new.alert IS E'' +'Short text description of the alert. More detail on the different alerts can be found here: +https://help.miovision.com/s/article/Alert-and-Notification-Types'; From ba5b8d9f1892a973e61018e180c045f34e384cd2 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:49:32 +0000 Subject: [PATCH 2/9] #1007 merge open_issues changes from #950 + change to new alerts table --- .../sql/views/create-view-open_issues.sql | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/volumes/miovision/sql/views/create-view-open_issues.sql b/volumes/miovision/sql/views/create-view-open_issues.sql index 8fc959feb..76ca304f7 100644 --- a/volumes/miovision/sql/views/create-view-open_issues.sql +++ b/volumes/miovision/sql/views/create-view-open_issues.sql @@ -1,6 +1,21 @@ --DROP VIEW miovision_api.open_issues; CREATE OR REPLACE VIEW miovision_api.open_issues AS +WITH alerts AS ( + SELECT + ar.uid, + string_agg(DISTINCT alerts.alert, '; ') AS alerts + FROM miovision_api.anomalous_ranges AS ar + LEFT JOIN miovision_api.alerts_new AS alerts + ON alerts.intersection_uid = ar.intersection_uid + AND alerts.start_time >= ar.range_start + AND ( + alerts.end_time < ar.range_end + OR ar.range_end IS NULL + ) + GROUP BY ar.uid +) + SELECT ar.uid, ar.intersection_uid, @@ -13,11 +28,12 @@ SELECT WHEN ar.classification_uid IS NULL THEN 'All modes' ELSE c.classification END, + ar.leg, ar.range_start::date, (current_timestamp AT TIME ZONE 'EST5EDT')::date - ar.range_start::date AS num_days, ar.notes, SUM(v.volume) AS last_week_volume, - string_agg(DISTINCT alerts.alert, '; ') AS alerts + alerts.alerts FROM miovision_api.anomalous_ranges AS ar --keep rows with null classification_uid LEFT JOIN miovision_api.classifications AS c USING (classification_uid) @@ -26,18 +42,19 @@ JOIN miovision_api.intersections AS i USING (intersection_uid) --find last week volume LEFT JOIN miovision_api.volumes AS v ON ar.intersection_uid = v.intersection_uid + --volume within the last 7 days and after AR started + AND v.datetime_bin >= ar.range_start + --prune the partitions AND v.datetime_bin >= current_date - interval '7 days' AND ( ar.classification_uid = v.classification_uid OR ar.classification_uid IS NULL ) -LEFT JOIN miovision_api.alerts - ON alerts.intersection_id = i.id - AND alerts.start_time >= ar.range_start AND ( - alerts.end_time < ar.range_end - OR ar.range_end IS NULL + ar.leg = v.leg + OR ar.leg IS NULL ) +LEFT JOIN alerts ON alerts.uid = ar.uid WHERE ar.problem_level <> 'valid-caveat' --currently active @@ -56,7 +73,8 @@ GROUP BY ar.classification_uid, c.classification, ar.range_start, - ar.notes + ar.notes, + alerts.alerts ORDER BY ar.intersection_uid, ar.range_start, From 3d094e835016cb89cd9608e0418b72dc74f9e55b Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:03:03 +0000 Subject: [PATCH 3/9] #1007 replace create-table-alerts --- .../sql/table/create-table-alerts.sql | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/volumes/miovision/sql/table/create-table-alerts.sql b/volumes/miovision/sql/table/create-table-alerts.sql index be2fcfa1d..ecf461d3d 100644 --- a/volumes/miovision/sql/table/create-table-alerts.sql +++ b/volumes/miovision/sql/table/create-table-alerts.sql @@ -1,52 +1,43 @@ --- Table: miovision_api.alerts +-- Table: miovision_api.alerts_new --- DROP TABLE IF EXISTS miovision_api.alerts; +-- DROP TABLE IF EXISTS miovision_api.alerts_new; -CREATE TABLE IF NOT EXISTS miovision_api.alerts +CREATE TABLE IF NOT EXISTS miovision_api.alerts_new ( + alert_id text COLLATE pg_catalog."default" NOT NULL, intersection_id text COLLATE pg_catalog."default" NOT NULL, alert text COLLATE pg_catalog."default" NOT NULL, start_time timestamp without time zone NOT NULL, - end_time timestamp without time zone, + end_time timestamp without time zone, intersection_uid integer, - CONSTRAINT miovision_alerts_pkey PRIMARY KEY (intersection_id, alert, start_time), - CONSTRAINT miov_alert_intersection_fkey FOREIGN KEY (intersection_uid) + CONSTRAINT miovision_alerts_pkey_new PRIMARY KEY (alert_id), + CONSTRAINT miov_alert_intersection_fkey_new FOREIGN KEY (intersection_uid) REFERENCES miovision_api.intersections (intersection_uid) MATCH FULL ON UPDATE NO ACTION - ON DELETE NO ACTION; + ON DELETE NO ACTION ) TABLESPACE pg_default; -ALTER TABLE IF EXISTS miovision_api.alerts +ALTER TABLE IF EXISTS miovision_api.alerts_new OWNER TO miovision_admins; -REVOKE ALL ON TABLE miovision_api.alerts FROM bdit_humans; -REVOKE ALL ON TABLE miovision_api.alerts FROM miovision_api_bot; +REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans; +REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot; -GRANT SELECT ON TABLE miovision_api.alerts TO bdit_humans; +GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans; -GRANT ALL ON TABLE miovision_api.alerts TO miovision_admins; +GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins; -GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts TO miovision_api_bot; +GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot; -COMMENT ON TABLE miovision_api.alerts -IS 'This table contains Miovision alerts to 5 minute accuracy, -with maximum interval of 1 day. Pulled by a daily Airflow DAG `miovision_alerts`. -Note: a more detailed description is available on Miovision One.'; +COMMENT ON TABLE miovision_api.alerts_new IS E'' +'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. ' +'Note: a more detailed description is available on Miovision One.'; -COMMENT ON COLUMN miovision_api.alerts.intersection_id +COMMENT ON COLUMN miovision_api.alerts_new.intersection_id IS 'The intersection id, corresponding to intersections.intersection_id column'; -COMMENT ON COLUMN miovision_api.alerts.alert -IS 'Short text description of the alert. More detail on the different alerts can be found here: -https://help.miovision.com/s/article/Alert-and-Notification-Types'; - -COMMENT ON COLUMN miovision_api.alerts.start_time -IS 'First 5 minute interval at which the alert appeared. -Subtract 5 minutes to get earliest possible start time.'; - -COMMENT ON COLUMN miovision_api.alerts.end_time -IS 'Final 5 minute interval at which the alert appeared. -Add 5 minutes to get latest possible end time. Note if end -time is midnight, this could be extended on the following day.'; +COMMENT ON COLUMN miovision_api.alerts_new.alert IS E'' +'Short text description of the alert. More detail on the different alerts can be found here: +https://help.miovision.com/s/article/Alert-and-Notification-Types'; \ No newline at end of file From 5a51267fb4fff8b9726499d030cdd591471b5ff5 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:05:26 +0000 Subject: [PATCH 4/9] #1007 replace alerts-new with alerts --- volumes/miovision/api/pull_alert.py | 6 +-- .../inserts/insert-miovision_alerts_new.sql | 4 +- .../sql/table/create-table-alerts.sql | 24 +++++------ .../sql/table/create-table-alerts_new.sql | 43 ------------------- .../sql/views/create-view-open_issues.sql | 2 +- 5 files changed, 18 insertions(+), 61 deletions(-) delete mode 100644 volumes/miovision/sql/table/create-table-alerts_new.sql diff --git a/volumes/miovision/api/pull_alert.py b/volumes/miovision/api/pull_alert.py index 5dfbf41fb..e07685ed9 100644 --- a/volumes/miovision/api/pull_alert.py +++ b/volumes/miovision/api/pull_alert.py @@ -147,7 +147,7 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): df = pd.DataFrame(alerts) dfs.append(df) logger.info('Done pulling. Transforming alerts.') - #create pandas df and restructure + #create pandas df and restructure final = pd.concat(dfs, ignore_index=True) final.rename(columns={0: "alertId", 1: "alertStartDateTime", 2: "alertEndDateTime", 3: "intersection_id", 4: "type"}, inplace = True) final.replace({np.NaN: None}, inplace = True) @@ -155,11 +155,11 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): values = list(final.itertuples(index=False, name=None)) #sql insert data script - fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts_new.sql') + fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts.sql') with open(fpath, 'r', encoding='utf-8') as file: insert_query = sql.SQL(file.read()) - logger.info('Inserting values into `miovision_api.alerts_new`.') + logger.info('Inserting values into `miovision_api.alerts`.') with conn.cursor() as cur: execute_values(cur, insert_query, values) diff --git a/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql index 32fd90c2f..720f23b16 100644 --- a/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql +++ b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql @@ -1,4 +1,4 @@ -INSERT INTO miovision_api.alerts_new AS n ( +INSERT INTO miovision_api.alerts AS n ( alert_id, start_time, end_time, intersection_id, alert ) VALUES %s @@ -13,7 +13,7 @@ WHERE n.alert_id = EXCLUDED.alert_id; --update foreign key referencing miovision_api.intersections --handles new records as well as old records with null intersection_uid (newly added intersections) -UPDATE miovision_api.alerts_new AS n +UPDATE miovision_api.alerts AS n SET intersection_uid = i.intersection_uid FROM miovision_api.intersections AS i WHERE diff --git a/volumes/miovision/sql/table/create-table-alerts.sql b/volumes/miovision/sql/table/create-table-alerts.sql index ecf461d3d..8eb3d3754 100644 --- a/volumes/miovision/sql/table/create-table-alerts.sql +++ b/volumes/miovision/sql/table/create-table-alerts.sql @@ -1,8 +1,8 @@ --- Table: miovision_api.alerts_new +-- Table: miovision_api.alerts --- DROP TABLE IF EXISTS miovision_api.alerts_new; +-- DROP TABLE IF EXISTS miovision_api.alerts; -CREATE TABLE IF NOT EXISTS miovision_api.alerts_new +CREATE TABLE IF NOT EXISTS miovision_api.alerts ( alert_id text COLLATE pg_catalog."default" NOT NULL, intersection_id text COLLATE pg_catalog."default" NOT NULL, @@ -19,25 +19,25 @@ CREATE TABLE IF NOT EXISTS miovision_api.alerts_new TABLESPACE pg_default; -ALTER TABLE IF EXISTS miovision_api.alerts_new +ALTER TABLE IF EXISTS miovision_api.alerts OWNER TO miovision_admins; -REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans; -REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot; +REVOKE ALL ON TABLE miovision_api.alerts FROM bdit_humans; +REVOKE ALL ON TABLE miovision_api.alerts FROM miovision_api_bot; -GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans; +GRANT SELECT ON TABLE miovision_api.alerts TO bdit_humans; -GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins; +GRANT ALL ON TABLE miovision_api.alerts TO miovision_admins; -GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot; +GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts TO miovision_api_bot; -COMMENT ON TABLE miovision_api.alerts_new IS E'' +COMMENT ON TABLE miovision_api.alerts IS E'' 'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. ' 'Note: a more detailed description is available on Miovision One.'; -COMMENT ON COLUMN miovision_api.alerts_new.intersection_id +COMMENT ON COLUMN miovision_api.alerts.intersection_id IS 'The intersection id, corresponding to intersections.intersection_id column'; -COMMENT ON COLUMN miovision_api.alerts_new.alert IS E'' +COMMENT ON COLUMN miovision_api.alerts.alert IS E'' 'Short text description of the alert. More detail on the different alerts can be found here: https://help.miovision.com/s/article/Alert-and-Notification-Types'; \ No newline at end of file diff --git a/volumes/miovision/sql/table/create-table-alerts_new.sql b/volumes/miovision/sql/table/create-table-alerts_new.sql deleted file mode 100644 index 2a3a0959d..000000000 --- a/volumes/miovision/sql/table/create-table-alerts_new.sql +++ /dev/null @@ -1,43 +0,0 @@ --- Table: miovision_api.alerts_new - --- DROP TABLE IF EXISTS miovision_api.alerts_new; - -CREATE TABLE IF NOT EXISTS miovision_api.alerts_new -( - alert_id text COLLATE pg_catalog."default" NOT NULL, - intersection_id text COLLATE pg_catalog."default" NOT NULL, - alert text COLLATE pg_catalog."default" NOT NULL, - start_time timestamp without time zone NOT NULL, - end_time timestamp without time zone, - intersection_uid integer, - CONSTRAINT miovision_alerts_pkey_new PRIMARY KEY (alert_id), - CONSTRAINT miov_alert_intersection_fkey_new FOREIGN KEY (intersection_uid) - REFERENCES miovision_api.intersections (intersection_uid) MATCH FULL - ON UPDATE NO ACTION - ON DELETE NO ACTION -) - -TABLESPACE pg_default; - -ALTER TABLE IF EXISTS miovision_api.alerts_new -OWNER TO miovision_admins; - -REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans; -REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot; - -GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans; - -GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins; - -GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot; - -COMMENT ON TABLE miovision_api.alerts_new IS E'' -'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. ' -'Note: a more detailed description is available on Miovision One.'; - -COMMENT ON COLUMN miovision_api.alerts_new.intersection_id -IS 'The intersection id, corresponding to intersections.intersection_id column'; - -COMMENT ON COLUMN miovision_api.alerts_new.alert IS E'' -'Short text description of the alert. More detail on the different alerts can be found here: -https://help.miovision.com/s/article/Alert-and-Notification-Types'; diff --git a/volumes/miovision/sql/views/create-view-open_issues.sql b/volumes/miovision/sql/views/create-view-open_issues.sql index 76ca304f7..d8f4a4ddc 100644 --- a/volumes/miovision/sql/views/create-view-open_issues.sql +++ b/volumes/miovision/sql/views/create-view-open_issues.sql @@ -6,7 +6,7 @@ WITH alerts AS ( ar.uid, string_agg(DISTINCT alerts.alert, '; ') AS alerts FROM miovision_api.anomalous_ranges AS ar - LEFT JOIN miovision_api.alerts_new AS alerts + LEFT JOIN miovision_api.alerts ON alerts.intersection_uid = ar.intersection_uid AND alerts.start_time >= ar.range_start AND ( From bd4928e171c440e6cba69d0d3dc7739e87dee60a Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:06:16 +0000 Subject: [PATCH 5/9] #1007 rename file --- ...nsert-miovision_alerts_new.sql => insert-miovision_alerts.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename volumes/miovision/sql/inserts/{insert-miovision_alerts_new.sql => insert-miovision_alerts.sql} (100%) diff --git a/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql b/volumes/miovision/sql/inserts/insert-miovision_alerts.sql similarity index 100% rename from volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql rename to volumes/miovision/sql/inserts/insert-miovision_alerts.sql From ff483c97a764d625ef07b306c122bda72b89f5d5 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:09:31 +0000 Subject: [PATCH 6/9] #1007 replace open_issues with master branch --- .../sql/views/create-view-open_issues.sql | 32 ++++--------------- 1 file changed, 7 insertions(+), 25 deletions(-) diff --git a/volumes/miovision/sql/views/create-view-open_issues.sql b/volumes/miovision/sql/views/create-view-open_issues.sql index d8f4a4ddc..8fc959feb 100644 --- a/volumes/miovision/sql/views/create-view-open_issues.sql +++ b/volumes/miovision/sql/views/create-view-open_issues.sql @@ -1,21 +1,6 @@ --DROP VIEW miovision_api.open_issues; CREATE OR REPLACE VIEW miovision_api.open_issues AS -WITH alerts AS ( - SELECT - ar.uid, - string_agg(DISTINCT alerts.alert, '; ') AS alerts - FROM miovision_api.anomalous_ranges AS ar - LEFT JOIN miovision_api.alerts - ON alerts.intersection_uid = ar.intersection_uid - AND alerts.start_time >= ar.range_start - AND ( - alerts.end_time < ar.range_end - OR ar.range_end IS NULL - ) - GROUP BY ar.uid -) - SELECT ar.uid, ar.intersection_uid, @@ -28,12 +13,11 @@ SELECT WHEN ar.classification_uid IS NULL THEN 'All modes' ELSE c.classification END, - ar.leg, ar.range_start::date, (current_timestamp AT TIME ZONE 'EST5EDT')::date - ar.range_start::date AS num_days, ar.notes, SUM(v.volume) AS last_week_volume, - alerts.alerts + string_agg(DISTINCT alerts.alert, '; ') AS alerts FROM miovision_api.anomalous_ranges AS ar --keep rows with null classification_uid LEFT JOIN miovision_api.classifications AS c USING (classification_uid) @@ -42,19 +26,18 @@ JOIN miovision_api.intersections AS i USING (intersection_uid) --find last week volume LEFT JOIN miovision_api.volumes AS v ON ar.intersection_uid = v.intersection_uid - --volume within the last 7 days and after AR started - AND v.datetime_bin >= ar.range_start - --prune the partitions AND v.datetime_bin >= current_date - interval '7 days' AND ( ar.classification_uid = v.classification_uid OR ar.classification_uid IS NULL ) +LEFT JOIN miovision_api.alerts + ON alerts.intersection_id = i.id + AND alerts.start_time >= ar.range_start AND ( - ar.leg = v.leg - OR ar.leg IS NULL + alerts.end_time < ar.range_end + OR ar.range_end IS NULL ) -LEFT JOIN alerts ON alerts.uid = ar.uid WHERE ar.problem_level <> 'valid-caveat' --currently active @@ -73,8 +56,7 @@ GROUP BY ar.classification_uid, c.classification, ar.range_start, - ar.notes, - alerts.alerts + ar.notes ORDER BY ar.intersection_uid, ar.range_start, From 4883544d29210905deca90c116f49c9f4433b07d Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:27:31 +0000 Subject: [PATCH 7/9] #1007 update readmes with alerts changes --- volumes/miovision/api/readme.md | 24 ++++++++++++++++++++---- volumes/miovision/sql/readme.md | 28 ++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/volumes/miovision/api/readme.md b/volumes/miovision/api/readme.md index 44ea4c2bb..3f72fd535 100644 --- a/volumes/miovision/api/readme.md +++ b/volumes/miovision/api/readme.md @@ -8,7 +8,8 @@ - [Error responses](#error-responses) - [Input Files](#input-files) - [How to run the api](#how-to-run-the-api) - - [Command Line Options](#command-line-options) + - [TMCs (Volumes)](#tmcs-volumes) + - [Alerts](#alerts) - [Classifications](#classifications) - [API Classifications](#api-classifications) - [Old Classifications (csv dumps and datalink)](#old-classifications-csv-dumps-and-datalink) @@ -25,6 +26,7 @@ - [Notes](#notes) <!-- /TOC --> +<!-- /TOC --> # Overview This readme contains information on the script used to pull data from the Miovision `intersection_tmc` API and descriptions of the Airflow DAGs which make use of the API scripts and [sql functions](../sql/readme.md#postgresql-functions) to pull, aggregate, and run data quality checks on new. @@ -115,11 +117,13 @@ password={password} ## How to run the api -The process to use the API to download data is typically run through the daily [miovision_pull Airflow DAG](../../../dags/miovision_pull.py). However it can also be run through the command line. This can be useful when adding new intersections, or when troubleshooting. +### TMCs (Volumes) + +The process to use the API to download volumes data is typically run through the daily [miovision_pull Airflow DAG](../../../dags/miovision_pull.py). However it can also be run through the command line. This can be useful when adding new intersections, or when troubleshooting. In command prompt, navigate to the folder where the python file is [located](../api/) and run `python3 intersection_tmc.py run-api ...` with various command line options listed below. For example, to download and aggregate data from a custom date range, run `python3 intersection_tmc.py run-api --pull --agg --start_date=YYYY-MM-DD --end_date=YYYY-MM-DD`. The start and end variables will indicate the start and end date to pull data from the api. -### Command Line Options +**TMC Command Line Options** |Option|Format|Description|Example|Default| |-----|-------|-----|-----|-----| @@ -137,6 +141,18 @@ In command prompt, navigate to the folder where the python file is [located](../ The `--pull` and `--agg` commands allow us to run data pulling and aggregation together or independently, which is useful for when we want to check out the data before doing any processing. For example, when we are [finding valid intersection movements for new intersections](../update_intersections/readme.md#update-miovision_apiintersection_movements). +### Alerts + +Although it it typically run daily through the Airflow DAG [miovision_pull](../../../dags/miovision_pull.py) `pull_alerts` task, you can also pull from the Alerts API using the command line. This is helpful for backfilling multiple dates at once. An example command is: +`python3 pull_alert.py run-alerts-api-cli --start_date=2024-06-01 --end_date=2024-07-01` + +**Alerts Command Line Options** + +|Option|Format|Description|Example|Default| +|-----|-------|-----|-----|-----| +|start_date|YYYY-MM-DD|Specifies the start date to pull data from. Inclusive. |2018-08-01|The previous day| +|end_date|YYYY-MM-DD|Specifies the end date to pull data from. Must be at least 1 day after `start_date` and cannot be a future date. Exclusive. |2018-08-05|Today| + ## Classifications The classification given in the api is different than the ones given in the csv dumps, or the datalink. @@ -259,7 +275,7 @@ This updated Miovision DAG runs daily at 3am. The pull data tasks and subsequent - `create_month_partition` contains any partition creates necessary for a new month. `pull_miovision` pulls data from the API and inserts into `miovision_api.volumes` using `intersection_tmc.pull_data` function. -- `pull_alerts` pulls alerts from the API at 5 minute increments and inserts into [`miovision_api.alerts`](../sql/readme.md#alerts), extending existing alerts. The records are de-dupped (duplicates are a result of the short-form alert title used by the API), sorted, and runs are identified to identify the approximate alert start/end time. Before inserting, records are first used to update the end time of alerts that are continuous with existing alerts. +- `pull_alerts` pulls alerts occuring on this day from the API and inserts into [`miovision_api.alerts`](../sql/readme.md#alerts), updating `end_time` of existing alerts. ### `miovision_agg` TaskGroup This task group completes various Miovision aggregations. diff --git a/volumes/miovision/sql/readme.md b/volumes/miovision/sql/readme.md index 07da03c99..547623c3c 100644 --- a/volumes/miovision/sql/readme.md +++ b/volumes/miovision/sql/readme.md @@ -1,4 +1,5 @@ <!-- TOC --> + - [1. Overview](#1-overview) - [2. `miovision_api` Table Structure](#2-miovision_api-table-structure) - [Miovision Data Relationships at a Glance](#miovision-data-relationships-at-a-glance) @@ -39,6 +40,20 @@ - [An applied example](#an-applied-example) - [Identifying new anomalies](#identifying-new-anomalies) +<!-- /TOC --> +- [PostgreSQL Functions](#postgresql-functions) + - [Aggregation Functions](#aggregation-functions) + - [Clear Functions](#clear-functions) + - [Helper Functions](#helper-functions) + - [Partitioning Functions](#partitioning-functions) + - [Deprecated Functions](#deprecated-functions) +- [3. Finding Gaps and Malfunctioning Camera](#3-finding-gaps-and-malfunctioning-camera) + - [Part I - Unacceptable Gaps](#part-i---unacceptable-gaps) + - [Part II - Working Machine](#part-ii---working-machine) + - [Identifying Questionable Data Quality](#identifying-questionable-data-quality) + - [An applied example](#an-applied-example) + - [Identifying new anomalies](#identifying-new-anomalies) + <!-- /TOC --> # 1. Overview @@ -426,25 +441,26 @@ leg| text | A segment that forms part of a miovision intersection, identified by ### `alerts` -This table contains alerts for Miovision intersections pulled daily from the API by the `pull_alerts` task in the [`miovision_pull` DAG](../api/readme.md#miovision_pull). Due to the API structure, alerts are only queried every 5 minutes, meaning accuracy is limited and a short alert lasting less than 5 minutes could be missed entirely - or an alert ending and then beginning again within the same 5 minute interval would be merged into one. Higher accuracy alert records could be found in Miovision Alert emails or in the in the [Miovision One UI](https://miovision.one/intersection-monitoring/#/alerts). +This table contains alerts for Miovision intersections pulled daily from the API by the `pull_alerts` task in the [`miovision_pull` DAG](../api/readme.md#miovision_pull). Both ongoing and closed issues that intersect with the current day are pulled. The alerts can also be found in Miovision Alert emails or in the in the [Miovision One UI](https://miovision.one/intersection-monitoring/#/alerts). **Field Name**|**Data Type**|**Description**|**Example**| :-----|:-----|:-----|:-----| +alert_id | text | A unique id for the alert, from the API. | 75dc5b77-faa4-487e-a3de-a6b11358fdf5 | intersection_id | text | The intersection id, corresponding to intersections.intersection_id column | c04704a0-e1e2-4101-9c29-6823d0f41c52 | intersection_uid | integer | The intersection uid, a foreign key referencing intersections.intersection_uid column | 6 | alert | text | Short text description of the alert. More detail and tips for resolution are available in this [Miovision help article](https://help.miovision.com/s/article/Alert-and-Notification-Types) | PERIPHERAL_UNAVAILABLE | -start_time | timestamp | First 5 minute interval at which the alert appeared. **Subtract 5 minutes to get earliest possible start time.** | 2024-01-12 10:20:00 | -end_time | timestamp | Final 5 minute interval at which the alert appeared. **Add 5 minutes to get latest possible end time.** Note this could be extended the following day. | 2024-01-21 15:35:00 | +start_time | timestamp | Start of the alert. | 2024-01-12 10:20:00 | +end_time | timestamp | Start of the alert. Note when the end_time is null it means the alert is ongoing. | 2024-01-21 15:35:00 | -Below is an example of how to anti-join the alerts table, including the 5 minute buffer: +Below is an example of how to anti-join the alerts table: ```sql SELECT ... FROM miovision_api.volumes AS v --anti join alerts LEFT JOIN miovision_api.alerts AS a ON a.intersection_uid = v.intersection_uid - AND v.datetime_bin >= a.start_time - interval '5 minutes' - AND v.datetime_bin < a.end_time + interval '5 minutes' + AND v.datetime_bin >= a.start_time + AND v.datetime_bin < a.end_time WHERE a.intersection_uid IS NULL ``` From 9cb268ccbf29fb8033aeca28491328bf2fdc999a Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 19:28:41 +0000 Subject: [PATCH 8/9] #1007 fix TOC --- volumes/miovision/sql/readme.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/volumes/miovision/sql/readme.md b/volumes/miovision/sql/readme.md index 547623c3c..c27a4e4bf 100644 --- a/volumes/miovision/sql/readme.md +++ b/volumes/miovision/sql/readme.md @@ -40,20 +40,6 @@ - [An applied example](#an-applied-example) - [Identifying new anomalies](#identifying-new-anomalies) -<!-- /TOC --> -- [PostgreSQL Functions](#postgresql-functions) - - [Aggregation Functions](#aggregation-functions) - - [Clear Functions](#clear-functions) - - [Helper Functions](#helper-functions) - - [Partitioning Functions](#partitioning-functions) - - [Deprecated Functions](#deprecated-functions) -- [3. Finding Gaps and Malfunctioning Camera](#3-finding-gaps-and-malfunctioning-camera) - - [Part I - Unacceptable Gaps](#part-i---unacceptable-gaps) - - [Part II - Working Machine](#part-ii---working-machine) - - [Identifying Questionable Data Quality](#identifying-questionable-data-quality) - - [An applied example](#an-applied-example) - - [Identifying new anomalies](#identifying-new-anomalies) - <!-- /TOC --> # 1. Overview From 51aa565ec45f4633b5cdfc4ecc428bee367b9c05 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Mon, 15 Jul 2024 09:19:46 -0400 Subject: [PATCH 9/9] #1007 mention airflow venv requirement --- volumes/miovision/api/readme.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volumes/miovision/api/readme.md b/volumes/miovision/api/readme.md index 3f72fd535..14c8bb995 100644 --- a/volumes/miovision/api/readme.md +++ b/volumes/miovision/api/readme.md @@ -143,7 +143,7 @@ The `--pull` and `--agg` commands allow us to run data pulling and aggregation t ### Alerts -Although it it typically run daily through the Airflow DAG [miovision_pull](../../../dags/miovision_pull.py) `pull_alerts` task, you can also pull from the Alerts API using the command line. This is helpful for backfilling multiple dates at once. An example command is: +Although it it typically run daily through the Airflow DAG [miovision_pull](../../../dags/miovision_pull.py) `pull_alerts` task, you can also pull from the Alerts API using the command line within the airflow venv (since Airflow Connections are used for database connection and API key). This is helpful for backfilling multiple dates at once. An example command is: `python3 pull_alert.py run-alerts-api-cli --start_date=2024-06-01 --end_date=2024-07-01` **Alerts Command Line Options**