Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1007 miovision one updated alerts schema #1008

Merged
merged 10 commits into from
Jul 16, 2024
22 changes: 8 additions & 14 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
pull_data, find_gaps, aggregate_15_min_mvt, aggregate_15_min, aggregate_volumes_daily,
get_report_dates, get_intersection_info, agg_zero_volume_anomalous_ranges
)
from volumes.miovision.api.pull_alert import pull_alerts
from volumes.miovision.api.pull_alert import run_alerts_api
except:
raise ImportError("Cannot import DAG helper functions.")

Expand Down Expand Up @@ -114,18 +114,12 @@ def pull_miovision(ds = None, **context):
with mio_postgres.get_conn() as conn:
pull_data(conn, start_time, end_time, INTERSECTION, key)

@task(task_id = 'pull_alerts', trigger_rule='none_failed', retries = 1)
def pull_alerts_task(ds):
CONFIG = configparser.ConfigParser()
CONFIG.read(API_CONFIG_PATH)
api_key=CONFIG['API']
key=api_key['key']
start_date = dateutil.parser.parse(str(ds))
end_date = dateutil.parser.parse(str(ds_add(ds, 1)))
mio_postgres = PostgresHook("miovision_api_bot")

with mio_postgres.get_conn() as conn:
pull_alerts(conn, start_date, end_date, key)
@task(trigger_rule='none_failed', retries = 1)
def pull_alerts(ds):
run_alerts_api(
start_date=ds,
end_date=ds_add(ds, 1)
)

@task_group(tooltip="Tasks to aggregate newly pulled Miovision data.")
def miovision_agg():
Expand Down Expand Up @@ -266,7 +260,7 @@ def data_checks():

(
check_partitions() >>
[pull_miovision(), pull_alerts_task()] >>
[pull_miovision(), pull_alerts()] >>
miovision_agg() >>
t_done >>
data_checks()
Expand Down
199 changes: 88 additions & 111 deletions volumes/miovision/api/pull_alert.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import os
import datetime
import dateutil
import pytz
import json
from requests import Session
import logging
import configparser
import pandas as pd
from psycopg2 import connect
import numpy as np
import click
from psycopg2 import sql
from psycopg2.extras import execute_values

from airflow.hooks.base_hook import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook

def logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand All @@ -21,17 +28,37 @@ def logger():

session = Session()
session.proxies = {}
url = 'https://api.miovision.com/alerts/'

"""
CONFIG = configparser.ConfigParser()
CONFIG.read('/etc/airflow/data_scripts/volumes/miovision/api/config.cfg')
api_key=CONFIG['API']
key=api_key['key']
dbset = CONFIG['DBSETTINGS']
conn = connect(**dbset)
conn.autocommit = True
"""
URL_BASE = 'https://api.miovision.one/api/v1'
TZ = pytz.timezone("Canada/Eastern")
time_delta = datetime.timedelta(days=1)
default_start = str(datetime.date.today()-time_delta)
default_end = str(datetime.date.today())

SQL_DIR = os.path.join(os.path.dirname(os.path.abspath(os.path.dirname(__file__))), 'sql')

CONTEXT_SETTINGS = dict(
default_map={'run_alerts_api_cli': {'flag': 0}}
)

def run_alerts_api(start_date: str, end_date: str):
api_key = BaseHook.get_connection('miovision_api_key')
key = api_key.extra_dejson['key']
mio_postgres = PostgresHook("miovision_api_bot")
start_date = dateutil.parser.parse(str(start_date))
end_date = dateutil.parser.parse(str(end_date))
with mio_postgres.get_conn() as conn:
pull_alerts(conn, start_date, end_date, key)

@click.group(context_settings=CONTEXT_SETTINGS)
def cli():
pass

@cli.command()
@click.option('--start_date', default=default_start, help='format is YYYY-MM-DD for start date')
@click.option('--end_date' , default=default_end, help='format is YYYY-MM-DD for end date & excluding the day itself')

def run_alerts_api_cli(start_date, end_date):
return run_alerts_api(start_date, end_date)

class MiovAlertPuller:
"""Miovision API puller.
Expand All @@ -46,77 +73,46 @@ class MiovAlertPuller:
key : str
Miovision API access key.
"""

headers = {'Content-Type': 'application/json',
'Authorization': ''}

def __init__(self, active_time, key):
self.active_time = active_time
self.headers['Authorization'] = key

'apikey': ''}
def __init__(self, url, start_time, end_time, key):
self.url = url
self.start_time = start_time.isoformat()
self.end_time = end_time.isoformat()
self.headers['apikey'] = key
def get_response(self):
"""Requests data from API."""

params = {'activeAtTime': self.active_time}

params = {'startDateTime': self.start_time,
'endDateTime': self.end_time}
response = session.get(
url,
url=self.url,
params=params,
headers=self.headers,
proxies=session.proxies)

headers=self.headers
)
# Return if we get a success response code, or raise an error if not.
if response.status_code == 200:
return response
elif response.status_code == 401:
raise Exception('Error' + str(response.status_code))

elif response.status_code == 404:
raise Exception('Error' + str(response.status_code))
def process_timestamp(self, utc_timestamp):
if utc_timestamp is None:
return None
utc_timestamp = dateutil.parser.parse(str(utc_timestamp))
#convert timestamp from utc to local TZ
local_timestamp = utc_timestamp.replace(tzinfo=pytz.utc).astimezone(TZ)
return local_timestamp.replace(tzinfo=None)
def process_alert(self, row):
"""Process one row of Alert API output."""

# Return time, classification_uid, leg, movement, volume.
return (self.active_time, row['intersectionId'], row['type'])

start_timestamp = self.process_timestamp(row['alertStartDateTime'])
end_timestamp = self.process_timestamp(row['alertEndDateTime'])
return (row['alertId'], start_timestamp, end_timestamp, row['intersectionId'], row['type'])
def process_response(self, response):
"""Process the output of self.get_response."""
data = json.loads(response.content.decode('utf-8'))
return [self.process_alert(row) for row in data]

#insert data script
insert_data = """WITH new_values AS (
SELECT intersection_id::text, alert::text, start_time::timestamp, end_time::timestamp
FROM (VALUES %s) AS new_values(intersection_id, alert, start_time, end_time)
),

--extend existing alerts
updated AS (
UPDATE miovision_api.alerts
SET end_time = new_values.end_time
FROM new_values
WHERE
alerts.intersection_id = new_values.intersection_id
AND alerts.alert = new_values.alert
--where old end = new start
AND alerts.end_time = new_values.start_time
--returns the new values used for updates (to be excluded from insert)
RETURNING new_values.*
)

--insert new alerts (exclude updated)
INSERT INTO miovision_api.alerts (intersection_id, alert, start_time, end_time)
SELECT intersection_id, alert, start_time, end_time FROM new_values
EXCEPT
SELECT intersection_id, alert, start_time, end_time FROM updated
ON CONFLICT (intersection_id, alert, start_time) DO NOTHING;

--update foreign key referencing miovision_api.intersections
--handles new records as well as old records with null intersection_uid (newly added intersections)
UPDATE miovision_api.alerts
SET intersection_uid = i.intersection_uid
FROM miovision_api.intersections AS i
WHERE alerts.intersection_id = i.id
AND alerts.intersection_uid IS NULL;
"""
return [self.process_alert(row) for row in data['alerts']], data['links']['next']

def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str):
"""Miovision Alert Puller
Expand All @@ -135,56 +131,37 @@ def pull_alerts(conn: any, start_date: datetime, end_date: datetime, key: str):
key : str
Miovision API access key.
"""

STEP_SIZE = datetime.timedelta(minutes=5)

#establish list of timestamps to iterate over
start_date = TZ.localize(start_date)
end_date = TZ.localize(end_date)
if end_date < start_date:
raise ValueError('end_time is not greater than start_time.')
dt = start_date
dt_list = [dt, ]
while(dt < end_date):
dt = dt + STEP_SIZE
dt_list.append(dt)

logger.info('Pulling Miovision alerts from %s to %s.', start_date, end_date)

#pull alerts at each timestamps and append to list
#pull alerts from each page and append to list
dfs = []
for dt in dt_list:
miovpull = MiovAlertPuller(dt, key)
test = miovpull.get_response()
response = miovpull.process_response(test)
df = pd.DataFrame(response).drop_duplicates()
pageSize = 100
gabrielwol marked this conversation as resolved.
Show resolved Hide resolved
url = f"{URL_BASE}/alerts?pageSize={pageSize}&pageNumber=0"
while url is not None:
miovpull = MiovAlertPuller(url, start_date, end_date, key)
response = miovpull.get_response()
alerts, url = miovpull.process_response(response)
df = pd.DataFrame(alerts)
dfs.append(df)

logger.info('Done pulling. Transforming alerts.')

#create pandas df and restructure
#create pandas df and restructure
final = pd.concat(dfs, ignore_index=True)
final.rename(columns={0: "time", 1: "intersection_id", 2: "alert"}, inplace = True)
final['time'] = pd.to_datetime(final['time'])
final.sort_values(by=['intersection_id', 'alert', 'time'], inplace = True)
final.reset_index(drop = True)
final.drop_duplicates(inplace = True, ignore_index = True)

#group by alert and time
final['shifted'] = final.groupby(by = ['intersection_id', 'alert'])['time'].shift(1)
final['time_lag'] = final['time'] - final['shifted']
final['start_time'] = final['time']

#iterate through and check if consecutive
for index, row in final.iterrows():
if (row['time_lag'] == pd.Timedelta(f"{STEP_SIZE} minutes")): #lag = step size
final.at[index, 'start_time'] = final.at[index-1, 'start_time'] #assign previous start time

#find start and end time of consecutive values
summary = final.groupby(by = ['intersection_id', 'alert', 'start_time']).agg({'time': ['max']})
summary = summary.reset_index()

final.rename(columns={0: "alertId", 1: "alertStartDateTime", 2: "alertEndDateTime", 3: "intersection_id", 4: "type"}, inplace = True)
final.replace({np.NaN: None}, inplace = True)
#convert to tuples for inserting
values = list(summary.itertuples(index=False, name=None))

values = list(final.itertuples(index=False, name=None))

#sql insert data script
fpath = os.path.join(SQL_DIR, 'inserts/insert-miovision_alerts.sql')
with open(fpath, 'r', encoding='utf-8') as file:
insert_query = sql.SQL(file.read())

logger.info('Inserting values into `miovision_api.alerts`.')
with conn.cursor() as cur:
execute_values(cur, insert_data, values)
execute_values(cur, insert_query, values)

if __name__ == '__main__':
cli()
24 changes: 20 additions & 4 deletions volumes/miovision/api/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
- [Error responses](#error-responses)
- [Input Files](#input-files)
- [How to run the api](#how-to-run-the-api)
- [Command Line Options](#command-line-options)
- [TMCs (Volumes)](#tmcs-volumes)
- [Alerts](#alerts)
- [Classifications](#classifications)
- [API Classifications](#api-classifications)
- [Old Classifications (csv dumps and datalink)](#old-classifications-csv-dumps-and-datalink)
Expand All @@ -25,6 +26,7 @@
- [Notes](#notes)

<!-- /TOC -->
<!-- /TOC -->

# Overview
This readme contains information on the script used to pull data from the Miovision `intersection_tmc` API and descriptions of the Airflow DAGs which make use of the API scripts and [sql functions](../sql/readme.md#postgresql-functions) to pull, aggregate, and run data quality checks on new.
Expand Down Expand Up @@ -115,11 +117,13 @@ password={password}

## How to run the api

The process to use the API to download data is typically run through the daily [miovision_pull Airflow DAG](../../../dags/miovision_pull.py). However it can also be run through the command line. This can be useful when adding new intersections, or when troubleshooting.
### TMCs (Volumes)

The process to use the API to download volumes data is typically run through the daily [miovision_pull Airflow DAG](../../../dags/miovision_pull.py). However it can also be run through the command line. This can be useful when adding new intersections, or when troubleshooting.

In command prompt, navigate to the folder where the python file is [located](../api/) and run `python3 intersection_tmc.py run-api ...` with various command line options listed below. For example, to download and aggregate data from a custom date range, run `python3 intersection_tmc.py run-api --pull --agg --start_date=YYYY-MM-DD --end_date=YYYY-MM-DD`. The start and end variables will indicate the start and end date to pull data from the api.

### Command Line Options
**TMC Command Line Options**

|Option|Format|Description|Example|Default|
|-----|-------|-----|-----|-----|
Expand All @@ -137,6 +141,18 @@ In command prompt, navigate to the folder where the python file is [located](../

The `--pull` and `--agg` commands allow us to run data pulling and aggregation together or independently, which is useful for when we want to check out the data before doing any processing. For example, when we are [finding valid intersection movements for new intersections](../update_intersections/readme.md#update-miovision_apiintersection_movements).

### Alerts

Although it it typically run daily through the Airflow DAG [miovision_pull](../../../dags/miovision_pull.py) `pull_alerts` task, you can also pull from the Alerts API using the command line within the airflow venv (since Airflow Connections are used for database connection and API key). This is helpful for backfilling multiple dates at once. An example command is:
`python3 pull_alert.py run-alerts-api-cli --start_date=2024-06-01 --end_date=2024-07-01`
gabrielwol marked this conversation as resolved.
Show resolved Hide resolved

**Alerts Command Line Options**

|Option|Format|Description|Example|Default|
|-----|-------|-----|-----|-----|
|start_date|YYYY-MM-DD|Specifies the start date to pull data from. Inclusive. |2018-08-01|The previous day|
|end_date|YYYY-MM-DD|Specifies the end date to pull data from. Must be at least 1 day after `start_date` and cannot be a future date. Exclusive. |2018-08-05|Today|

## Classifications

The classification given in the api is different than the ones given in the csv dumps, or the datalink.
Expand Down Expand Up @@ -259,7 +275,7 @@ This updated Miovision DAG runs daily at 3am. The pull data tasks and subsequent
- `create_month_partition` contains any partition creates necessary for a new month.

`pull_miovision` pulls data from the API and inserts into `miovision_api.volumes` using `intersection_tmc.pull_data` function.
- `pull_alerts` pulls alerts from the API at 5 minute increments and inserts into [`miovision_api.alerts`](../sql/readme.md#alerts), extending existing alerts. The records are de-dupped (duplicates are a result of the short-form alert title used by the API), sorted, and runs are identified to identify the approximate alert start/end time. Before inserting, records are first used to update the end time of alerts that are continuous with existing alerts.
- `pull_alerts` pulls alerts occuring on this day from the API and inserts into [`miovision_api.alerts`](../sql/readme.md#alerts), updating `end_time` of existing alerts.

### `miovision_agg` TaskGroup
This task group completes various Miovision aggregations.
Expand Down
21 changes: 21 additions & 0 deletions volumes/miovision/sql/inserts/insert-miovision_alerts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
INSERT INTO miovision_api.alerts AS n (
alert_id, start_time, end_time, intersection_id, alert
)
VALUES %s
ON CONFLICT (alert_id)
DO UPDATE
SET
intersection_id = EXCLUDED.intersection_id,
alert = EXCLUDED.alert,
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time
WHERE n.alert_id = EXCLUDED.alert_id;

--update foreign key referencing miovision_api.intersections
--handles new records as well as old records with null intersection_uid (newly added intersections)
UPDATE miovision_api.alerts AS n
SET intersection_uid = i.intersection_uid
FROM miovision_api.intersections AS i
WHERE
n.intersection_id = i.id
AND n.intersection_uid IS NULL;
Loading