Skip to content

Commit

Permalink
#1007 new alerts api schema
Browse files Browse the repository at this point in the history
gabrielwol committed Jul 5, 2024

Verified

This commit was signed with the committer’s verified signature.
narbs91 Narb
1 parent b80a161 commit 5b9148d
Showing 4 changed files with 159 additions and 122 deletions.
22 changes: 8 additions & 14 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@
pull_data, find_gaps, aggregate_15_min_mvt, aggregate_15_min, aggregate_volumes_daily,
get_report_dates, get_intersection_info, agg_zero_volume_anomalous_ranges
)
from volumes.miovision.api.pull_alert import pull_alerts
from volumes.miovision.api.pull_alert import run_alerts_api
except:
raise ImportError("Cannot import DAG helper functions.")

@@ -114,18 +114,12 @@ def pull_miovision(ds = None, **context):
with mio_postgres.get_conn() as conn:
pull_data(conn, start_time, end_time, INTERSECTION, key)

@task(task_id = 'pull_alerts', trigger_rule='none_failed', retries = 1)
def pull_alerts_task(ds):
CONFIG = configparser.ConfigParser()
CONFIG.read(API_CONFIG_PATH)
api_key=CONFIG['API']
key=api_key['key']
start_date = dateutil.parser.parse(str(ds))
end_date = dateutil.parser.parse(str(ds_add(ds, 1)))
mio_postgres = PostgresHook("miovision_api_bot")

with mio_postgres.get_conn() as conn:
pull_alerts(conn, start_date, end_date, key)
@task(trigger_rule='none_failed', retries = 1)
def pull_alerts(ds):
run_alerts_api(
start_date=ds,
end_date=ds_add(ds, 1)
)

@task_group(tooltip="Tasks to aggregate newly pulled Miovision data.")
def miovision_agg():
@@ -266,7 +260,7 @@ def data_checks():

(
check_partitions() >>
[pull_miovision(), pull_alerts_task()] >>
[pull_miovision(), pull_alerts()] >>
miovision_agg() >>
t_done >>
data_checks()
195 changes: 87 additions & 108 deletions volumes/miovision/api/pull_alert.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import os
import datetime
import dateutil
import pytz
import json
from requests import Session
import logging
import configparser
import pandas as pd
from psycopg2 import connect
import numpy as np
import click
from psycopg2 import sql
from psycopg2.extras import execute_values

from airflow.hooks.base_hook import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook

def logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -21,15 +28,37 @@ def logger():

session = Session()
session.proxies = {}
url = 'https://api.miovision.com/alerts/'
URL_BASE = 'https://api.miovision.one/api/v1'
TZ = pytz.timezone("Canada/Eastern")
time_delta = datetime.timedelta(days=1)
default_start = str(datetime.date.today()-time_delta)
default_end = str(datetime.date.today())

SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql')

CONTEXT_SETTINGS = dict(
default_map={'run_alerts_api_cli': {'flag': 0}}
)

def run_alerts_api(start_date: str, end_date: str):
api_key = BaseHook.get_connection('miovision_api_key')
key = api_key.extra_dejson['key']
mio_postgres = PostgresHook("miovision_api_bot")
start_date = dateutil.parser.parse(str(start_date))
end_date = dateutil.parser.parse(str(end_date))
with mio_postgres.get_conn() as conn:
pull_alerts(conn, start_date, end_date, key)

@click.group(context_settings=CONTEXT_SETTINGS)
def cli():
pass

@cli.command()
@click.option('--start_date', default=default_start, help='format is YYYY-MM-DD for start date')
@click.option('--end_date' , default=default_end, help='format is YYYY-MM-DD for end date & excluding the day itself')

CONFIG = configparser.ConfigParser()
CONFIG.read('/etc/airflow/data_scripts/volumes/miovision/api/config.cfg')
api_key=CONFIG['API']
key=api_key['key']
dbset = CONFIG['DBSETTINGS']
conn = connect(**dbset)
conn.autocommit = True
def run_alerts_api_cli(start_date, end_date):
return run_alerts_api(start_date, end_date)

class MiovAlertPuller:
"""Miovision API puller.
@@ -44,77 +73,46 @@ class MiovAlertPuller:
key : str
Miovision API access key.
"""

headers = {'Content-Type': 'application/json',
'Authorization': ''}

def __init__(self, active_time, key):
self.active_time = active_time
self.headers['Authorization'] = key

'apikey': ''}
def __init__(self, url, start_time, end_time, key):
self.url = url
self.start_time = start_time.isoformat()
self.end_time = end_time.isoformat()
self.headers['apikey'] = key
def get_response(self):
"""Requests data from API."""

params = {'activeAtTime': self.active_time}

params = {'startDateTime': self.start_time,
'endDateTime': self.end_time}
response = session.get(
url,
url=self.url,
params=params,
headers=self.headers,
proxies=session.proxies)

headers=self.headers
)
# Return if we get a success response code, or raise an error if not.
if response.status_code == 200:
return response
elif response.status_code == 401:
raise Exception('Error' + str(response.status_code))

elif response.status_code == 404:
raise Exception('Error' + str(response.status_code))
def process_timestamp(self, utc_timestamp):
if utc_timestamp is None:
return None
utc_timestamp = dateutil.parser.parse(str(utc_timestamp))
#convert timestamp from utc to local TZ
local_timestamp = utc_timestamp.replace(tzinfo=pytz.utc).astimezone(TZ)
return local_timestamp.replace(tzinfo=None)
def process_alert(self, row):
"""Process one row of Alert API output."""

# Return time, classification_uid, leg, movement, volume.
return (self.active_time, row['intersectionId'], row['type'])

start_timestamp = self.process_timestamp(row['alertStartDateTime'])
end_timestamp = self.process_timestamp(row['alertEndDateTime'])
return (row['alertId'], start_timestamp, end_timestamp, row['intersectionId'], row['type'])
def process_response(self, response):
"""Process the output of self.get_response."""
data = json.loads(response.content.decode('utf-8'))
return [self.process_alert(row) for row in data]

#insert data script
insert_data = """WITH new_values AS (
SELECT intersection_id::text, alert::text, start_time::timestamp, end_time::timestamp
FROM (VALUES %s) AS new_values(intersection_id, alert, start_time, end_time)
),
--extend existing alerts
updated AS (
UPDATE miovision_api.alerts
SET end_time = new_values.end_time
FROM new_values
WHERE
alerts.intersection_id = new_values.intersection_id
AND alerts.alert = new_values.alert
--where old end = new start
AND alerts.end_time = new_values.start_time
--returns the new values used for updates (to be excluded from insert)
RETURNING new_values.*
)
--insert new alerts (exclude updated)
INSERT INTO miovision_api.alerts (intersection_id, alert, start_time, end_time)
SELECT intersection_id, alert, start_time, end_time FROM new_values
EXCEPT
SELECT intersection_id, alert, start_time, end_time FROM updated
ON CONFLICT (intersection_id, alert, start_time) DO NOTHING;
--update foreign key referencing miovision_api.intersections
--handles new records as well as old records with null intersection_uid (newly added intersections)
UPDATE miovision_api.alerts
SET intersection_uid = i.intersection_uid
FROM miovision_api.intersections AS i
WHERE alerts.intersection_id = i.id
AND alerts.intersection_uid IS NULL;
"""
return [self.process_alert(row) for row in data['alerts']], data['links']['next']

def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str):
"""Miovision Alert Puller
@@ -133,56 +131,37 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str):
key : str
Miovision API access key.
"""

STEP_SIZE = datetime.timedelta(minutes=5)

#establish list of timestamps to iterate over
start_date = TZ.localize(start_date)
end_date = TZ.localize(end_date)
if end_date < start_date:
raise ValueError('end_time is not greater than start_time.')
dt = start_date
dt_list = [dt, ]
while(dt < end_date):
dt = dt + STEP_SIZE
dt_list.append(dt)

logger.info('Pulling Miovision alerts from %s to %s.', start_date, end_date)

#pull alerts at each timestamps and append to list
#pull alerts from each page and append to list
dfs = []
for dt in dt_list:
miovpull = MiovAlertPuller(dt, key)
test = miovpull.get_response()
response = miovpull.process_response(test)
df = pd.DataFrame(response).drop_duplicates()
pageSize = 100
url = f"{URL_BASE}/alerts?pageSize={pageSize}&pageNumber=0"
while url is not None:
miovpull = MiovAlertPuller(url, start_date, end_date, key)
response = miovpull.get_response()
alerts, url = miovpull.process_response(response)
df = pd.DataFrame(alerts)
dfs.append(df)

logger.info('Done pulling. Transforming alerts.')

#create pandas df and restructure
final = pd.concat(dfs, ignore_index=True)
final.rename(columns={0: "time", 1: "intersection_id", 2: "alert"}, inplace = True)
final['time'] = pd.to_datetime(final['time'])
final.sort_values(by=['intersection_id', 'alert', 'time'], inplace = True)
final.reset_index(drop = True)
final.drop_duplicates(inplace = True, ignore_index = True)

#group by alert and time
final['shifted'] = final.groupby(by = ['intersection_id', 'alert'])['time'].shift(1)
final['time_lag'] = final['time'] - final['shifted']
final['start_time'] = final['time']

#iterate through and check if consecutive
for index, row in final.iterrows():
if (row['time_lag'] == pd.Timedelta(f"{STEP_SIZE} minutes")): #lag = step size
final.at[index, 'start_time'] = final.at[index-1, 'start_time'] #assign previous start time

#find start and end time of consecutive values
summary = final.groupby(by = ['intersection_id', 'alert', 'start_time']).agg({'time': ['max']})
summary = summary.reset_index()

final.rename(columns={0: "alertId", 1: "alertStartDateTime", 2: "alertEndDateTime", 3: "intersection_id", 4: "type"}, inplace = True)
final.replace({np.NaN: None}, inplace = True)
#convert to tuples for inserting
values = list(summary.itertuples(index=False, name=None))

logger.info('Inserting values into `miovision_api.alerts`.')
values = list(final.itertuples(index=False, name=None))

#sql insert data script
fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts_new.sql')
with open(fpath, 'r', encoding='utf-8') as file:
insert_query = sql.SQL(file.read())

logger.info('Inserting values into `miovision_api.alerts_new`.')
with conn.cursor() as cur:
execute_values(cur, insert_data, values)
execute_values(cur, insert_query, values)

if __name__ == '__main__':
cli()
21 changes: 21 additions & 0 deletions volumes/miovision/sql/inserts/insert-miovision_alerts_new.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
INSERT INTO miovision_api.alerts_new AS n (
alert_id, start_time, end_time, intersection_id, alert
)
VALUES %s
ON CONFLICT (alert_id)
DO UPDATE
SET
intersection_id = EXCLUDED.intersection_id,
alert = EXCLUDED.alert,
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time
WHERE n.alert_id = EXCLUDED.alert_id;

--update foreign key referencing miovision_api.intersections
--handles new records as well as old records with null intersection_uid (newly added intersections)
UPDATE miovision_api.alerts_new AS n
SET intersection_uid = i.intersection_uid
FROM miovision_api.intersections AS i
WHERE
n.intersection_id = i.id
AND n.intersection_uid IS NULL;
43 changes: 43 additions & 0 deletions volumes/miovision/sql/table/create-table-alerts_new.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Table: miovision_api.alerts_new

-- DROP TABLE IF EXISTS miovision_api.alerts_new;

CREATE TABLE IF NOT EXISTS miovision_api.alerts_new
(
alert_id text COLLATE pg_catalog."default" NOT NULL,
intersection_id text COLLATE pg_catalog."default" NOT NULL,
alert text COLLATE pg_catalog."default" NOT NULL,
start_time timestamp without time zone NOT NULL,
end_time timestamp without time zone,
intersection_uid integer,
CONSTRAINT miovision_alerts_pkey_new PRIMARY KEY (alert_id),
CONSTRAINT miov_alert_intersection_fkey_new FOREIGN KEY (intersection_uid)
REFERENCES miovision_api.intersections (intersection_uid) MATCH FULL
ON UPDATE NO ACTION
ON DELETE NO ACTION
)

TABLESPACE pg_default;

ALTER TABLE IF EXISTS miovision_api.alerts_new
OWNER TO miovision_admins;

REVOKE ALL ON TABLE miovision_api.alerts_new FROM bdit_humans;
REVOKE ALL ON TABLE miovision_api.alerts_new FROM miovision_api_bot;

GRANT SELECT ON TABLE miovision_api.alerts_new TO bdit_humans;

GRANT ALL ON TABLE miovision_api.alerts_new TO miovision_admins;

GRANT INSERT, SELECT, DELETE, UPDATE ON TABLE miovision_api.alerts_new TO miovision_api_bot;

COMMENT ON TABLE miovision_api.alerts_new IS E''
'This table contains Miovision alerts pulled by a daily Airflow DAG `miovision_pull`, `pull_alerts` task. '
'Note: a more detailed description is available on Miovision One.';

COMMENT ON COLUMN miovision_api.alerts_new.intersection_id
IS 'The intersection id, corresponding to intersections.intersection_id column';

COMMENT ON COLUMN miovision_api.alerts_new.alert IS E''
'Short text description of the alert. More detail on the different alerts can be found here:
https://help.miovision.com/s/article/Alert-and-Notification-Types';

0 comments on commit 5b9148d

Please sign in to comment.