Skip to content

Commit

Permalink
Merge pull request #1014 from CityofToronto/984-ecocounter-pull-recen…
Browse files Browse the repository at this point in the history
…t-outages

984 ecocounter pull recent outages
  • Loading branch information
gabrielwol authored Jul 31, 2024
2 parents 646c389 + 311c130 commit b3ca04c
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 56 deletions.
57 changes: 26 additions & 31 deletions dags/ecocounter_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
from dags.common_tasks import check_jan_1st, wait_for_weather_timesensor
from dags.custom_operators import SQLCheckOperatorWithReturnValue
from volumes.ecocounter.pull_data_from_api import (
getToken, getSites, getFlowData, siteIsKnownToUs, insertSite,
insertFlow, flowIsKnownToUs, truncateFlowSince, insertFlowCounts,
getKnownSites, getKnownFlows
getToken, getSites, siteIsKnownToUs, insertSite, insertFlow,
flowIsKnownToUs, getKnownSites, getKnownFlows, truncate_and_insert
)
except:
raise ImportError("Cannot import DAG helper functions.")
Expand Down Expand Up @@ -85,9 +84,8 @@ def check_partitions():
)

check_jan_1st.override(task_id="check_annual_partition")() >> create_annual_partition

@task(trigger_rule='none_failed')
def update_sites_and_flows(**context):

def get_connections():
api_conn = BaseHook.get_connection('ecocounter_api_key')
token = getToken(
api_conn.host,
Expand All @@ -96,7 +94,11 @@ def update_sites_and_flows(**context):
api_conn.extra_dejson['secret_api_hash']
)
eco_postgres = PostgresHook("ecocounter_bot")
return eco_postgres, token

@task(trigger_rule='none_failed')
def update_sites_and_flows(**context):
eco_postgres, token = get_connections()
new_sites, new_flows = [], []
with eco_postgres.get_conn() as conn:
for site in getSites(token):
Expand Down Expand Up @@ -140,38 +142,30 @@ def update_sites_and_flows(**context):

@task(trigger_rule='none_failed')
def pull_ecocounter(ds):
api_conn = BaseHook.get_connection('ecocounter_api_key')
token = getToken(
api_conn.host,
api_conn.login,
api_conn.password,
api_conn.extra_dejson['secret_api_hash']
)
eco_postgres = PostgresHook("ecocounter_bot")

eco_postgres, token = get_connections()
start_date = dateutil.parser.parse(str(ds))
end_date = dateutil.parser.parse(str(ds_add(ds, 1)))
LOGGER.info(f'Pulling data from {start_date} to {end_date}.')
with eco_postgres.get_conn() as conn:
for site_id in getKnownSites(conn):
LOGGER.debug(f'Starting on site {site_id}.')
for flow_id in getKnownFlows(conn, site_id):
LOGGER.debug(f'Starting on flow {flow_id} for site {site_id}.')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
LOGGER.debug(f'Fetching data for flow {flow_id}.')
counts = getFlowData(token, flow_id, start_date, end_date)
#convert response into a tuple for inserting
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
if len(volume) == 0:
LOGGER.info(f'{len(volume)} rows fetched for flow {flow_id} of site {site_id}.')
insertFlowCounts(conn, volume)
LOGGER.info(f'Data inserted for site {site_id}.')
truncate_and_insert(conn, token, flow_id, start_date, end_date)

@task(trigger_rule='none_failed')
def pull_recent_outages():
eco_postgres, token = get_connections()
#get list of outages
outage_query = "SELECT flow_id, start_time, end_time FROM ecocounter.identify_outages('60 days'::interval);"
with eco_postgres.get_conn() as conn, conn.cursor() as curr:
curr.execute(outage_query)
recent_outages = curr.fetchall()
#for each outage, try to pull data
with eco_postgres.get_conn() as conn:
for outage in recent_outages:
flow_id, start_date, end_date = outage
truncate_and_insert(conn, token, flow_id, start_date, end_date)

t_done = ExternalTaskMarker(
task_id="done",
external_dag_id="ecocounter_check",
Expand Down Expand Up @@ -219,6 +213,7 @@ def data_checks():
]

(
pull_recent_outages(),
check_partitions() >>
update_sites_and_flows() >>
pull_ecocounter() >>
Expand Down
84 changes: 84 additions & 0 deletions volumes/ecocounter/functions/create-function-identify-outages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
CREATE OR REPLACE FUNCTION ecocounter.identify_outages(
num_days interval
)
RETURNS TABLE (
flow_id numeric,
start_time timestamp,
end_time timestamp
)
LANGUAGE plpgsql
COST 100
VOLATILE

AS $BODY$

BEGIN

RETURN QUERY
WITH ongoing_outages AS (
SELECT
f.flow_id,
f.site_id,
dates.dt::date,
dates.dt - lag(dates.dt) OVER w = interval '1 day' AS consecutive
FROM ecocounter.flows_unfiltered AS f
CROSS JOIN
generate_series(
now()::date - num_days,
now()::date - interval '2 day', --2 bc last interval will be this + 1 day
interval '1 day'
) AS dates (dt)
LEFT JOIN ecocounter.counts_unfiltered AS c
ON c.flow_id = f.flow_id
AND c.datetime_bin >= dates.dt
AND c.datetime_bin < dates.dt + interval '1 day'
--select counts partitions
AND c.datetime_bin >= now()::date - num_days
AND c.datetime_bin < now()::date - interval '1 day'
WHERE
f.validated
AND dates.dt < COALESCE(f.date_decommissioned, now()::date - interval '1 day')
GROUP BY
f.flow_id,
f.site_id,
f.validated,
f.last_active,
f.date_decommissioned,
dates.dt
HAVING SUM(c.volume) IS NULL
WINDOW w AS (PARTITION BY f.flow_id ORDER BY dates.dt)
ORDER BY
f.flow_id,
dates.dt
),

group_ids AS (
SELECT
oo.flow_id,
oo.dt,
SUM(CASE WHEN oo.consecutive IS TRUE THEN 0 ELSE 1 END) OVER w AS group_id
FROM ongoing_outages AS oo
WINDOW w AS (PARTITION BY oo.flow_id ORDER BY oo.dt)
)

SELECT
gi.flow_id,
MIN(gi.dt)::timestamp AS start_time,
MAX(gi.dt) + interval '1 day' AS end_time
FROM group_ids AS gi
GROUP BY
gi.flow_id,
gi.group_id;

END;
$BODY$;

ALTER FUNCTION ecocounter.identify_outages(interval) OWNER TO ecocounter_admins;
GRANT ALL ON FUNCTION ecocounter.identify_outages(interval) TO ecocounter_admins;

GRANT EXECUTE ON FUNCTION ecocounter.identify_outages(interval) TO bdit_humans;
GRANT EXECUTE ON FUNCTION ecocounter.identify_outages(interval) TO ecocounter_bot;

COMMENT ON FUNCTION ecocounter.identify_outages(interval)
IS 'A function to identify day level outages (null volume) in Ecocounter data and group
them into runs for ease of pulling. Used by Airflow ecocounter_pull.pull_recent_outages task.';
31 changes: 18 additions & 13 deletions volumes/ecocounter/pull_data_from_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import requests
import logging
from configparser import ConfigParser
from psycopg2 import connect
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
from airflow.exceptions import AirflowFailException

LOGGER = logging.getLogger(__name__)

default_start = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)-timedelta(days=1)
default_end = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)

Expand Down Expand Up @@ -148,6 +151,20 @@ def insertFlow(conn: any, flow_id: int, site_id: int, flow_name: str, bin_size:
with conn.cursor() as cur:
cur.execute(insert_query, (flow_id, site_id, flow_name, bin_size))

def truncate_and_insert(conn, token, flow_id, start_date, end_date):
LOGGER.info(f'Attempting to fetch data for flow {flow_id} from {start_date} to {end_date}.')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
counts = getFlowData(token, flow_id, start_date, end_date)
#convert response into a tuple for inserting
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
LOGGER.info(f'{len(volume)} rows fetched for flow {flow_id} from {start_date} to {end_date}.')
insertFlowCounts(conn, volume)

#for testing/pulling data without use of airflow.
def run_api(
start_date: datetime = default_start,
Expand Down Expand Up @@ -177,16 +194,4 @@ def run_api(
if not flowIsKnownToUs(flow_id, conn):
print('unknown flow', flow_id)
continue
# we do have this site and flow in the database; let's update its counts
print(f'starting on flow {flow_id}')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
print(f'fetching data for flow {flow_id}')
counts = getFlowData(token, flow_id, start_date, end_date)
print(f'inserting data for flow {flow_id}')
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
insertFlowCounts(conn, volume)
truncate_and_insert(conn, token, flow_id, start_date, end_date)
23 changes: 13 additions & 10 deletions volumes/ecocounter/readme.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Ecocounter <!-- omit in toc -->

<!-- TOC -->

- [Bicycle loop detectors](#bicycle-loop-detectors)
Expand All @@ -9,8 +11,6 @@
- [Note](#note)
- [Historical data](#historical-data)
- [`ecocounter_pull` DAG](#ecocounter_pull-dag)
- [`check_partitions` TaskGroup](#check_partitions-taskgroup)
- [`data_checks` TaskGroup](#data_checks-taskgroup)
- [`ecocounter_check` DAG](#ecocounter_check-dag)
- [SQL Tables](#sql-tables)
- [Main Tables](#main-tables)
Expand Down Expand Up @@ -112,25 +112,28 @@ LIMIT 1000;
```

<!-- ecocounter_pull_doc_md -->

## `ecocounter_pull` DAG
The `ecocounter_pull` DAG runs daily at 3am to populate `ecocounter` schema with new data.

### `check_partitions` TaskGroup
- `check_annual_partition` checks if execution date is January 1st.
- `create_annual_partitions` creates a new annual partition for `ecocounter.counts_unfiltered` if previous task succeeds.
- `pull_recent_outages` task is similar to `pull_ecocounter` task except it tries to pull data corresponding to zero volume outages within the last 60 days. This was implemented following the finding that some Ecocounters will suddenly backfill missing data due to spotty cellular signal. Max ~2 weeks of backfilling has been observed so the task was conservatively set to look back 60 days.

- `check_partitions` TaskGroup
- `check_annual_partition` checks if execution date is January 1st.
- `create_annual_partitions` creates a new annual partition for `ecocounter.counts_unfiltered` if previous task succeeds.

- `update_sites_and_flows` task identifies any sites and "flows" (known as channels in the API) in the API which do not exist in our database and adds them to `ecocounter.sites_unfiltered` and `ecocounter.flows_unfiltered`. The new rows contain a flag `validated = null` indicating they still need to be manually validated. A notification is sent with any new additions.
- `pull_ecocounter` task pulls data from the Ecocounter API and inserts into the `ecocounter.counts_unfiltered` table.
- `done` is an external task marker to trigger the `ecocounter_check` DAG for additional "yellow card" data checks.

### `data_checks` TaskGroup
This task group runs data quality checks on the pipeline output.
- `wait_for_weather` delays the downstream data check by a few hours until the historical weather is available to add context.
- `check_volume` checks the sum of volume in `ecocounter.counts` (filtered view) and notifies if less than 70% of the 60 day lookback avg.
- `check_distinct_flow_ids` checks the count of distinct flow_ids appearing in `ecocounter.counts` (filtered view) and notifies if less than 70% of the 60 day lookback avg.
- `data_checks` TaskGroup: This task group runs data quality checks on the pipeline output.
- `wait_for_weather` delays the downstream data check by a few hours until the historical weather is available to add context.
- `check_volume` checks the sum of volume in `ecocounter.counts` (filtered view) and notifies if less than 70% of the 60 day lookback avg.
- `check_distinct_flow_ids` checks the count of distinct flow_ids appearing in `ecocounter.counts` (filtered view) and notifies if less than 70% of the 60 day lookback avg.
<!-- ecocounter_pull_doc_md -->

<!-- ecocounter_check_doc_md -->

## `ecocounter_check` DAG
The `ecocounter_check` DAG runs daily at 4am following completion of `ecocounter_pull` to perform additional "yellow card" data checks on the new data.

Expand Down
1 change: 1 addition & 0 deletions volumes/ecocounter/tables/flows_unfiltered.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE ecocounter.flows_unfiltered (
validated boolean,
first_active timestamp without time zone,
last_active timestamp without time zone,
date_decommissioned timestamp without time zone,
CONSTRAINT locations_pkey PRIMARY KEY (flow_id),
CONSTRAINT flows_replaced_by_flow_id_fkey FOREIGN KEY (replaced_by_flow_id)
REFERENCES ecocounter.flows_unfiltered (flow_id) MATCH SIMPLE
Expand Down
1 change: 1 addition & 0 deletions volumes/ecocounter/tables/sites_unfiltered.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ CREATE TABLE ecocounter.sites_unfiltered (
centreline_id integer,
first_active timestamp without time zone,
last_active timestamp without time zone,
date_decommissioned timestamp without time zone,
CONSTRAINT sites_pkey PRIMARY KEY (site_id),
CONSTRAINT sites_replaced_by_fkey FOREIGN KEY (replaced_by_site_id)
REFERENCES ecocounter.sites_unfiltered (site_id) MATCH SIMPLE
Expand Down
3 changes: 2 additions & 1 deletion volumes/ecocounter/views/create-view-flows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ CREATE OR REPLACE VIEW ecocounter.flows AS (
replaces_flow_id,
includes_contraflow,
first_active,
last_active
last_active,
date_decommissioned
FROM ecocounter.flows_unfiltered
WHERE validated
);
Expand Down
3 changes: 2 additions & 1 deletion volumes/ecocounter/views/create-view-sites.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ CREATE OR REPLACE VIEW ecocounter.sites AS (
replaced_by_site_id,
centreline_id,
first_active,
last_active
last_active,
date_decommissioned
FROM ecocounter.sites_unfiltered
WHERE validated
);
Expand Down

0 comments on commit b3ca04c

Please sign in to comment.