From 5b9148d9150178f7c44928a7d9a60576a62b227f Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 5 Jul 2024 18:48:52 +0000 Subject: [PATCH] #1007 new alerts api schema --- dags/miovision_pull.py | 22 +- volumes/miovision/api/pull_alert.py | 195 ++++++++---------- .../inserts/insert-miovision_alerts_new.sql | 21 ++ .../sql/table/create-table-alerts_new.sql | 43 ++++ 4 files changed, 159 insertions(+), 122 deletions(-) create mode 100644 volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql create mode 100644 volumes/miovision/sql/table/create-table-alerts_new.sql diff --git a/dags/miovision_pull.py b/dags/miovision_pull.py index 24e86bac0..7d5ac8165 100644 --- a/dags/miovision_pull.py +++ b/dags/miovision_pull.py @@ -29,7 +29,7 @@ pull_data, find_gaps, aggregate_15_min_mvt, aggregate_15_min, aggregate_volumes_daily, get_report_dates, get_intersection_info, agg_zero_volume_anomalous_ranges ) - from volumes.miovision.api.pull_alert import pull_alerts + from volumes.miovision.api.pull_alert import run_alerts_api except: raise ImportError("Cannot import DAG helper functions.") @@ -114,18 +114,12 @@ def pull_miovision(ds = None, **context): with mio_postgres.get_conn() as conn: pull_data(conn, start_time, end_time, INTERSECTION, key) - @task(task_id = 'pull_alerts', trigger_rule='none_failed', retries = 1) - def pull_alerts_task(ds): - CONFIG = configparser.ConfigParser() - CONFIG.read(API_CONFIG_PATH) - api_key=CONFIG['API'] - key=api_key['key'] - start_date = dateutil.parser.parse(str(ds)) - end_date = dateutil.parser.parse(str(ds_add(ds, 1))) - mio_postgres = PostgresHook("miovision_api_bot") - - with mio_postgres.get_conn() as conn: - pull_alerts(conn, start_date, end_date, key) + @task(trigger_rule='none_failed', retries = 1) + def pull_alerts(ds): + run_alerts_api( + start_date=ds, + end_date=ds_add(ds, 1) + ) @task_group(tooltip="Tasks to aggregate newly pulled Miovision data.") def miovision_agg(): @@ -266,7 +260,7 @@ def data_checks(): ( check_partitions() >> - [pull_miovision(), pull_alerts_task()] >> + [pull_miovision(), pull_alerts()] >> miovision_agg() >> t_done >> data_checks() diff --git a/volumes/miovision/api/pull_alert.py b/volumes/miovision/api/pull_alert.py index b25a30006..5dfbf41fb 100644 --- a/volumes/miovision/api/pull_alert.py +++ b/volumes/miovision/api/pull_alert.py @@ -1,12 +1,19 @@ +import os import datetime +import dateutil +import pytz import json from requests import Session import logging -import configparser import pandas as pd -from psycopg2 import connect +import numpy as np +import click +from psycopg2 import sql from psycopg2.extras import execute_values +from airflow.hooks.base_hook import BaseHook +from airflow.providers.postgres.hooks.postgres import PostgresHook + def logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -21,15 +28,37 @@ def logger(): session = Session() session.proxies = {} -url = 'https://api.miovision.com/alerts/' +URL_BASE = 'https://api.miovision.one/api/v1' +TZ = pytz.timezone("Canada/Eastern") +time_delta = datetime.timedelta(days=1) +default_start = str(datetime.date.today()-time_delta) +default_end = str(datetime.date.today()) + +SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql') + +CONTEXT_SETTINGS = dict( + default_map={'run_alerts_api_cli': {'flag': 0}} +) + +def run_alerts_api(start_date: str, end_date: str): + api_key = BaseHook.get_connection('miovision_api_key') + key = api_key.extra_dejson['key'] + mio_postgres = PostgresHook("miovision_api_bot") + start_date = dateutil.parser.parse(str(start_date)) + end_date = dateutil.parser.parse(str(end_date)) + with mio_postgres.get_conn() as conn: + pull_alerts(conn, start_date, end_date, key) + +@click.group(context_settings=CONTEXT_SETTINGS) +def cli(): + pass + +@cli.command() +@click.option('--start_date', default=default_start, help='format is YYYY-MM-DD for start date') +@click.option('--end_date' , default=default_end, help='format is YYYY-MM-DD for end date & excluding the day itself') -CONFIG = configparser.ConfigParser() -CONFIG.read('/etc/airflow/data_scripts/volumes/miovision/api/config.cfg') -api_key=CONFIG['API'] -key=api_key['key'] -dbset = CONFIG['DBSETTINGS'] -conn = connect(**dbset) -conn.autocommit = True +def run_alerts_api_cli(start_date, end_date): + return run_alerts_api(start_date, end_date) class MiovAlertPuller: """Miovision API puller. @@ -44,77 +73,46 @@ class MiovAlertPuller: key : str Miovision API access key. """ - headers = {'Content-Type': 'application/json', - 'Authorization': ''} - - def __init__(self, active_time, key): - self.active_time = active_time - self.headers['Authorization'] = key - + 'apikey': ''} + def __init__(self, url, start_time, end_time, key): + self.url = url + self.start_time = start_time.isoformat() + self.end_time = end_time.isoformat() + self.headers['apikey'] = key def get_response(self): """Requests data from API.""" - - params = {'activeAtTime': self.active_time} - + params = {'startDateTime': self.start_time, + 'endDateTime': self.end_time} response = session.get( - url, + url=self.url, params=params, - headers=self.headers, - proxies=session.proxies) - + headers=self.headers + ) # Return if we get a success response code, or raise an error if not. if response.status_code == 200: return response elif response.status_code == 401: raise Exception('Error' + str(response.status_code)) - + elif response.status_code == 404: + raise Exception('Error' + str(response.status_code)) + def process_timestamp(self, utc_timestamp): + if utc_timestamp is None: + return None + utc_timestamp = dateutil.parser.parse(str(utc_timestamp)) + #convert timestamp from utc to local TZ + local_timestamp = utc_timestamp.replace(tzinfo=pytz.utc).astimezone(TZ) + return local_timestamp.replace(tzinfo=None) def process_alert(self, row): """Process one row of Alert API output.""" - # Return time, classification_uid, leg, movement, volume. - return (self.active_time, row['intersectionId'], row['type']) - + start_timestamp = self.process_timestamp(row['alertStartDateTime']) + end_timestamp = self.process_timestamp(row['alertEndDateTime']) + return (row['alertId'], start_timestamp, end_timestamp, row['intersectionId'], row['type']) def process_response(self, response): """Process the output of self.get_response.""" data = json.loads(response.content.decode('utf-8')) - return [self.process_alert(row) for row in data] - -#insert data script -insert_data = """WITH new_values AS ( - SELECT intersection_id::text, alert::text, start_time::timestamp, end_time::timestamp - FROM (VALUES %s) AS new_values(intersection_id, alert, start_time, end_time) -), - ---extend existing alerts -updated AS ( - UPDATE miovision_api.alerts - SET end_time = new_values.end_time - FROM new_values - WHERE - alerts.intersection_id = new_values.intersection_id - AND alerts.alert = new_values.alert - --where old end = new start - AND alerts.end_time = new_values.start_time - --returns the new values used for updates (to be excluded from insert) - RETURNING new_values.* -) - ---insert new alerts (exclude updated) -INSERT INTO miovision_api.alerts (intersection_id, alert, start_time, end_time) -SELECT intersection_id, alert, start_time, end_time FROM new_values -EXCEPT -SELECT intersection_id, alert, start_time, end_time FROM updated -ON CONFLICT (intersection_id, alert, start_time) DO NOTHING; - ---update foreign key referencing miovision_api.intersections ---handles new records as well as old records with null intersection_uid (newly added intersections) -UPDATE miovision_api.alerts -SET intersection_uid = i.intersection_uid -FROM miovision_api.intersections AS i -WHERE alerts.intersection_id = i.id -AND alerts.intersection_uid IS NULL; -""" + return [self.process_alert(row) for row in data['alerts']], data['links']['next'] def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): """Miovision Alert Puller @@ -133,56 +131,37 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str): key : str Miovision API access key. """ - - STEP_SIZE = datetime.timedelta(minutes=5) - - #establish list of timestamps to iterate over + start_date = TZ.localize(start_date) + end_date = TZ.localize(end_date) if end_date < start_date: raise ValueError('end_time is not greater than start_time.') - dt = start_date - dt_list = [dt, ] - while(dt < end_date): - dt = dt + STEP_SIZE - dt_list.append(dt) - logger.info('Pulling Miovision alerts from %s to %s.', start_date, end_date) - - #pull alerts at each timestamps and append to list + #pull alerts from each page and append to list dfs = [] - for dt in dt_list: - miovpull = MiovAlertPuller(dt, key) - test = miovpull.get_response() - response = miovpull.process_response(test) - df = pd.DataFrame(response).drop_duplicates() + pageSize = 100 + url = f"{URL_BASE}/alerts?pageSize={pageSize}&pageNumber=0" + while url is not None: + miovpull = MiovAlertPuller(url, start_date, end_date, key) + response = miovpull.get_response() + alerts, url = miovpull.process_response(response) + df = pd.DataFrame(alerts) dfs.append(df) - logger.info('Done pulling. Transforming alerts.') - #create pandas df and restructure final = pd.concat(dfs, ignore_index=True) - final.rename(columns={0: "time", 1: "intersection_id", 2: "alert"}, inplace = True) - final['time'] = pd.to_datetime(final['time']) - final.sort_values(by=['intersection_id', 'alert', 'time'], inplace = True) - final.reset_index(drop = True) - final.drop_duplicates(inplace = True, ignore_index = True) - - #group by alert and time - final['shifted'] = final.groupby(by = ['intersection_id', 'alert'])['time'].shift(1) - final['time_lag'] = final['time'] - final['shifted'] - final['start_time'] = final['time'] - - #iterate through and check if consecutive - for index, row in final.iterrows(): - if (row['time_lag'] == pd.Timedelta(f"{STEP_SIZE} minutes")): #lag = step size - final.at[index, 'start_time'] = final.at[index-1, 'start_time'] #assign previous start time - - #find start and end time of consecutive values - summary = final.groupby(by = ['intersection_id', 'alert', 'start_time']).agg({'time': ['max']}) - summary = summary.reset_index() - + final.rename(columns={0: "alertId", 1: "alertStartDateTime", 2: "alertEndDateTime", 3: "intersection_id", 4: "type"}, inplace = True) + final.replace({np.NaN: None}, inplace = True) #convert to tuples for inserting - values = list(summary.itertuples(index=False, name=None)) - - logger.info('Inserting values into `miovision_api.alerts`.') + values = list(final.itertuples(index=False, name=None)) + + #sql insert data script + fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts_new.sql') + with open(fpath, 'r', encoding='utf-8') as file: + insert_query = sql.SQL(file.read()) + + logger.info('Inserting values into `miovision_api.alerts_new`.') with conn.cursor() as cur: - execute_values(cur, insert_data, values) \ No newline at end of file + execute_values(cur, insert_query, values) + +if __name__ == '__main__': + cli() \ No newline at end of file diff --git a/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql new file mode 100644 index 000000000..32fd90c2f --- /dev/null +++ b/volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql @@ -0,0 +1,21 @@ +INSERT INTO miovision_api.alerts_new AS n ( + alert_id, start_time, end_time, intersection_id, alert +) +VALUES %s +ON CONFLICT (alert_id) +DO UPDATE +SET + intersection_id = EXCLUDED.intersection_id, + alert = EXCLUDED.alert, + start_time = EXCLUDED.start_time, + end_time = EXCLUDED.end_time +WHERE n.alert_id = EXCLUDED.alert_id; + +--update foreign key referencing miovision_api.intersections +--handles new records as well as old records with null intersection_uid (newly added intersections) +UPDATE miovision_api.alerts_new AS n +SET intersection_uid = i.intersection_uid +FROM miovision_api.intersections AS i +WHERE + n.intersection_id = i.id + AND n.intersection_uid IS NULL; \ No newline at end of file diff --git a/volumes/miovision/sql/table/create-table-alerts_new.sql b/volumes/miovision/sql/table/create-table-alerts_new.sql new file mode 100644 index 000000000..2a3a0959d --- /dev/null +++ b/volumes/miovision/sql/table/create-table-alerts_new.sql @@ -0,0 +1,43 @@ +-- Table: miovision_api.alerts_new + +-- DROP TABLE IF EXISTS miovision_api.alerts_new; + +CREATE TABLE IF NOT EXISTS miovision_api.alerts_new +( + alert_id text COLLATE pg_catalog."default" NOT NULL, + intersection_id text COLLATE pg_catalog."default" NOT NULL, + alert text COLLATE pg_catalog."default" NOT NULL, + start_time timestamp without time zone NOT NULL, + end_time timestamp without time zone, + intersection_uid integer, + CONSTRAINT miovision_alerts_pkey_new PRIMARY KEY (alert_id), + CONSTRAINT miov_alert_intersection_fkey_new FOREIGN KEY (intersection_uid) + REFERENCES miovision_api.intersections (intersection_uid) MATCH FULL + ON UPDATE NO ACTION + ON DELETE NO ACTION +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS miovision_api.alerts_new +OWNER TO miovision_admins; + +REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans; +REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot; + +GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans; + +GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins; + +GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot; + +COMMENT ON TABLE miovision_api.alerts_new IS E'' +'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. ' +'Note: a more detailed description is available on Miovision One.'; + +COMMENT ON COLUMN miovision_api.alerts_new.intersection_id +IS 'The intersection id, corresponding to intersections.intersection_id column'; + +COMMENT ON COLUMN miovision_api.alerts_new.alert IS E'' +'Short text description of the alert. More detail on the different alerts can be found here: +https://help.miovision.com/s/article/Alert-and-Notification-Types';