Skip to content

Commit

Permalink
#1088 readme, comment updates
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Jan 10, 2025
1 parent e73e818 commit f06ef50
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 52 deletions.
4 changes: 2 additions & 2 deletions dags/rodars_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
'retries': 1,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, #Allow for progressive longer waits between retries
#'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True),
'on_failure_callback': partial(task_fail_slack_alert, use_proxy = True),
'catchup': True,
}

Expand Down Expand Up @@ -65,4 +65,4 @@ def pull_rodar_locations(ds = None):
pull_rodars_issues()
pull_rodar_locations()

rodars_dag()
rodars_dag()
96 changes: 56 additions & 40 deletions events/construction/readme.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,45 @@
# Introduction

[Road Disruption Activity Reporting System (RoDARS)](https://www.toronto.ca/services-payments/streets-parking-transportation/road-restrictions-closures/road-disruption-activity-reporting-system-rodars/)
[!IMPORTANT]
The city website gives a good overview of RoDARS (here/below): [Road Disruption Activity Reporting System (RoDARS)](https://www.toronto.ca/services-payments/streets-parking-transportation/road-restrictions-closures/road-disruption-activity-reporting-system-rodars/)

> RoDARS is a system that informs the public of planned roadway closures throughout the City. The submission procedure follows the acquisition of an approved Street Occupation Permit (construction) or Street Closure Permit (event).
> RoDARS is a system that informs the public of planned roadway closures throughout the City. The submission procedure follows the acquisition of an approved [Street Occupation Permit](https://www.toronto.ca/?page_id=80501) (construction) or [Street Closure Permit](https://www.toronto.ca/?page_id=84975) (event).
>
> When occupying any portion of the City’s public right of way that is not an expressway, the applicant must submit a RoDARS Notification Form (opens in new window) to TMC Dispatch at least two business days before the start of occupation. > The RoDARS Notification Form must be approved by the appropriate Work Zone Traffic Coordinator (WZTC) before being submitted to TMC Dispatch.
> When occupying any portion of the City’s public right of way that is not an expressway, the applicant must submit a [**RoDARS Notification Form**](https://www.toronto.ca/wp-content/uploads/2019/03/8de1-TS_Fillable-RoDARS-Form.pdf) to TMC Dispatch at least two business days before the start of occupation. > The RoDARS Notification Form must be approved by the appropriate Work Zone Traffic Coordinator (WZTC) before being submitted to TMC Dispatch.
>
> When occupying any portion of a City expressway (F.G.G., DVP or Allen Rd between Eglinton Ave W and Transit Rd), the applicant must submit a RoDARS Notification Form to TMC Dispatch at least seven business days before the start of > occupation. The RoDARS Notification Form must be approved by the appropriate City project manager/engineer before submittal to TMC Dispatch. Once attained from TMC Dispatch, TMC’s RESCU Unit will then notify the applicant of the approval > verdict.
>
> A separate RoDARS Notification Form is required for each occupied roadway. If the daily schedule varies, separate RoDARS Notification Forms are required for each day. Once the RoDARS form has been submitted and approved, the information then > appears on the Traffic Restrictions Map. Please refer to the City Expressway Closure Guidelines (opens in new window) for allowable roadway occupancy times.
> A separate RoDARS Notification Form is required for each occupied roadway. If the daily schedule varies, separate RoDARS Notification Forms are required for each day. Once the RoDARS form has been submitted and approved, the information then > appears on the [Traffic Restrictions Map](https://www.toronto.ca/?page_id=63656). Please refer to the [City Expressway Closure Guidelines](https://www.toronto.ca/wp-content/uploads/2017/11/9184-0_RoDARS-City-Expressway-Closure-Guidelines-a.pdf) for allowable roadway occupancy times.
>
> The applicant must notify the City if either of the following situations arise:
>
> the work schedule and/or work zone plan has been revised or postponed. The applicant must submit a revised and approved RoDARS Notification Form at least one business day before changes occur
> the work has been cancelled or completed early. The applicant must contact TMC Dispatch

## RODARS DAG

<!-- rodars_pull_doc_md -->

- `pull_rodars`: pulls RODARS issue data from ITSC and inserts into RDS.

<!-- rodars_pull_doc_md -->
> 1. the work schedule and/or work zone plan has been revised or postponed. The applicant must submit a revised and approved RoDARS Notification Form at least one business day before changes occur
> 2. the work has been cancelled or completed early. The applicant must contact TMC Dispatch
[!TIP]
The RoDARS form is public here: https://rodars.transnomis.com/Permit/PermitApplicationCreate/a9180443-b97f-548e-ae1c-fc70cae18a7a?previewMode=Applicants

RODARs form showing extremely detailed lane management plan dropdowns.
Here is a screenshot of the extremely detailed geographic/lane management plan UI:
![Rodars Form](rodars_form.png)

**Questions:**
# RODARS vs RODARS New ("rodars_new_approved")

What is included in RODARS vs. not?
RODARS new vs old?
Sources besides RODARS on ITS Central? (divisionid)
[!IMPORTANT]
In 2024 a new version of RODARS which should result in a more reliable data source.

[Road Restrictions Map](https://www.toronto.ca/services-payments/streets-parking-transportation/road-restrictions-closures/restrictions-map/#location=2%20Muggs%20Island%20Pk&lat=43.62414889248682&lng=-79.38697494415&zoom=14)

- Hazard: what's this?
- Construction: just RODARs?
- Road Closed: what's this?


Column Questions:

`timeoption`: 0-4
-
`lanesaffected`: "{""LocationDescription"":""Huron St from Harbord St to Classic Ave"",""EncodedCoordinates"":""{_oiGpyrcNrDoA"",""LaneApproaches"":[{""Direction"":3,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20},{""Direction"":2,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20}],""LocationBlockLevel"":3,""RoadClosureType"":20}"
- `laneblocklevel`
- `LanesAffectedPattern`
- `LocationBlockLevel`
- `RoadClosureType`
**RODARS (New)**
- RODARs New has only been around since 2024-03 (already has more than 28,000 issues!)
- An online form which contractors fill out directly. Approvals are done by work zone coordinators.
- QR codes will start appearing at sites in 2024/2025, which should help enforceability (citizen reporting/bylaw officers).
- There will be penalties.
- Most records have `centreline_id`!
- Contains detailed escription of lane closure pattern (`lanesaffectedpattern`).

## Comparing RODARS and RODARS New (`rodars_new_approved`)
**RODARS (Old)**
- Apparently fax was involved and not all forms were processed = completeness is a concern.
- `centreline_id` was introduced later in the lifespan of original RODARS (Only about 1/3 of those records have a centreline_id, starting from 2021-09).

Here is a small comparison of the data quality of the new and old RODARS.
- RODARs New has only been around since 2024-03 (already has more than 28,000 issues!)
- centreline_id was introduced later in the lifespan of original RODARS (Only about 1/3 of those records have a centreline_id, starting from 2021-09).
Here is a small comparison of the data of the new and old RODARS (differentiated by `divisionid` / `divisionname` as seen below):

| "divisionid" | "divisionname" | "avg_actual_duration" | "avg_proposed_duration" | "min_starttimestamp" | "max_starttimestamp" | "count" | "has_centreline_id" | "start_centreline" |
|--------------|-----------------------|---------------------------|---------------------------|----------------------|----------------------|---------|---------------------|------------------------------|
Expand All @@ -77,3 +60,36 @@ SELECT
FROM congestion_events.rodars_locations
GROUP BY 1, 2 ORDER BY 1, 2;
```

### What's included?
- As noted in [the intro](#introduction), both construction and events (eg. parades, marathons) are included.
- Emergency utilities - maybe included.
- Notably, CafeTO is not included (As at EOY 2024).

# Data Ops

## RODARS DAG
`rodars_pull` DAG runs on Morbius in order to access ITS Central database. See code here: [rodars_pull.py](../../dags/rodars_pull.py).

<!-- rodars_pull_doc_md -->

- `pull_rodars_issues`: pulls RODARS issue data from ITSC and inserts into RDS.
- `pull_rodars_locations`: pulls RODARS issue location data from ITSC and inserts into RDS.

<!-- rodars_pull_doc_md -->

## `lanesaffected`

`lanesaffected` is a loosely formatted json column in the ITSC issuelocationnew table.

Notes:
- This field is unpacked with `process_lanesaffected` function in [rodars_functions.py](./rodars_functions.py) and converted to tabular format.
- Some of the same fields names are used in the top level and the nested json, `LaneApproaches`, eg. `RoadClosureType`. The `_toplevel` suffix is used in `congestion_events.rodars_issue_locations` for the top level fields.
- It is assumed the lower level details are more descriptive when available.
- FeatureId = centreline_id!
- `LanesAffectedPattern` is a code describing lane closures. In `congestion_events.rodars_issue_locations` it is converted to numeric columns: `lane_open_auto, lane_closed_auto, lane_open_bike, lane_closed_bike, lane_open_ped, lane_closed_ped, lane_open_bus, lane_closed_bus`

Sample:
```
`lanesaffected`: "{""LocationDescription"":""Huron St from Harbord St to Classic Ave"",""EncodedCoordinates"":""{_oiGpyrcNrDoA"",""LaneApproaches"":[{""Direction"":3,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20},{""Direction"":2,""RoadName"":""Huron St"",""FeatureId"":1143425,""RoadId"":3716,""LanesAffectedPattern"":""LOWO"",""LaneBlockLevel"":2,""RoadClosureType"":20}],""LocationBlockLevel"":3,""RoadClosureType"":20}"
```
30 changes: 21 additions & 9 deletions events/construction/rodars_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,24 @@

from airflow.providers.postgres.hooks.postgres import PostgresHook

#fpath = '/data/home/gwolofs/bdit_data-sources/events/rodars/rodars_issues_functions.py'
#SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(fpath)), 'sql')
SQL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sql')

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

def coordinates_from_binary(br):
# Read longitude and latitude as doubles (8 bytes each)
'Read longitude and latitude as doubles (8 bytes each)'
longitude, latitude = struct.unpack('dd', br.read(16))
return (longitude, latitude)

def coordinates_to_geomfromtext(l):
'Formats points and line geoms to be ingested by postgres `st_geomfromtext`'
geom_type = 'POINT' if len(l) == 1 else 'LINESTRING'
coords = ', '.join([f"{x[0]} {x[1]}" for x in l])
return (f"{geom_type}({coords})")
return f"{geom_type}({coords})"

def geometry_from_bytes(geo_bytes):
# Initialize a stream to read binary data from the byte array
'Initialize a stream to read binary data from the byte array'
coordinates_list = []
with BytesIO(geo_bytes) as ms:
# Read the first 4 bytes = length
Expand All @@ -44,6 +43,12 @@ def geometry_from_bytes(geo_bytes):
return coordinates_list

def process_lanesaffected(json_str):
'''Converts a json variable to pandas dataframe.
Top level json attributes are given _toplevel suffix,
while contents of LaneApproaches nested json keeps original keys,
with exception of FeatureId (centreline_id) and RoadId (linear_name_id).'''

if (json_str == 'Unknown') | (json_str is None):
return None
try:
Expand Down Expand Up @@ -76,6 +81,7 @@ def fetch_and_insert_issue_data(
insert_conn = PostgresHook('vds_bot'),
start_date = None
):
'''Fetch, process and insert data from ITS Central issuedata table.'''
select_fpath = os.path.join(SQL_DIR, 'select-rodars_issues.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read()).format(
Expand Down Expand Up @@ -109,7 +115,15 @@ def fetch_and_insert_location_data(
insert_conn = PostgresHook('vds_bot'),
start_date = None
):
#generic function to pull and insert data using different connections and queries.
'''Fetch, process and insert data from ITS Central issuelocationnew table.
- Fetches data from ITS Central
- Processes geometry data stored in binary (accounts for both points/lines).
- Unnests mutli layered lanesaffected json column into tabular form.
- Performs some checks on columns unnested from json.
- Inserts into RDS `congestion_events.rodars_issue_locations` table.
'''

select_fpath = os.path.join(SQL_DIR, 'select-rodars_issue_locations.sql')
with open(select_fpath, 'r', encoding="utf-8") as file:
select_query = sql.SQL(file.read()).format(
Expand Down Expand Up @@ -159,7 +173,7 @@ def fetch_and_insert_location_data(
#check for extra columns unpacked from json.
extra_cols = [col for col in df_no_geom.columns if col not in cols_to_insert]
if extra_cols != []:
LOGGER.warning(f'There are extra columns unpacked from json not being inserted: %s', extra_cols)
LOGGER.warning('There are extra columns unpacked from json not being inserted: %s', extra_cols)
#add missing columns (inconsistent jsons)
missing_cols = [col for col in cols_to_insert if col not in df_no_geom.columns]
if missing_cols != []:
Expand All @@ -184,5 +198,3 @@ def fetch_and_insert_location_data(

with insert_conn.get_conn() as con, con.cursor() as cur:
execute_values(cur, insert_query, df_no_geom, page_size = 1000)

#fetch_and_insert_data()
2 changes: 2 additions & 0 deletions events/construction/sql/select-rodars_issue_locations.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
--this select query is used to select issue locations from ITSC database in rodars_pull pipeline.

WITH issues AS (
--select the most recent version of each issue
SELECT
Expand Down
7 changes: 6 additions & 1 deletion events/construction/sql/select-rodars_issues.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
--this select query is used to select issue metadata from ITSC database in rodars_pull pipeline.

WITH issues AS (
--select the most recent version of each issue
SELECT DISTINCT ON (divisionid, issueid)
Expand All @@ -20,7 +22,10 @@ WITH issues AS (
)
AND timestamputc >= {start}::date -- noqa: PRS, LT02
AND timestamputc < {start}::date + interval '1 day' -- noqa: PRS
ORDER BY divisionid ASC, issueid ASC, timestamputc DESC
ORDER BY
divisionid ASC,
issueid ASC,
timestamputc DESC
)

SELECT
Expand Down

0 comments on commit f06ef50

Please sign in to comment.