Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

984 ecocounter pull recent outages #1014

Merged
merged 15 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 26 additions & 31 deletions dags/ecocounter_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
from dags.common_tasks import check_jan_1st, wait_for_weather_timesensor
from dags.custom_operators import SQLCheckOperatorWithReturnValue
from volumes.ecocounter.pull_data_from_api import (
getToken, getSites, getFlowData, siteIsKnownToUs, insertSite,
insertFlow, flowIsKnownToUs, truncateFlowSince, insertFlowCounts,
getKnownSites, getKnownFlows
getToken, getSites, siteIsKnownToUs, insertSite, insertFlow,
flowIsKnownToUs, getKnownSites, getKnownFlows, truncate_and_insert
)
except:
raise ImportError("Cannot import DAG helper functions.")
Expand Down Expand Up @@ -85,9 +84,8 @@ def check_partitions():
)

check_jan_1st.override(task_id="check_annual_partition")() >> create_annual_partition

@task(trigger_rule='none_failed')
def update_sites_and_flows(**context):

def get_connections():
api_conn = BaseHook.get_connection('ecocounter_api_key')
token = getToken(
api_conn.host,
Expand All @@ -96,7 +94,11 @@ def update_sites_and_flows(**context):
api_conn.extra_dejson['secret_api_hash']
)
eco_postgres = PostgresHook("ecocounter_bot")
return eco_postgres, token

@task(trigger_rule='none_failed')
def update_sites_and_flows(**context):
eco_postgres, token = get_connections()
new_sites, new_flows = [], []
with eco_postgres.get_conn() as conn:
for site in getSites(token):
Expand Down Expand Up @@ -140,38 +142,30 @@ def update_sites_and_flows(**context):

@task(trigger_rule='none_failed')
def pull_ecocounter(ds):
api_conn = BaseHook.get_connection('ecocounter_api_key')
token = getToken(
api_conn.host,
api_conn.login,
api_conn.password,
api_conn.extra_dejson['secret_api_hash']
)
eco_postgres = PostgresHook("ecocounter_bot")

eco_postgres, token = get_connections()
start_date = dateutil.parser.parse(str(ds))
end_date = dateutil.parser.parse(str(ds_add(ds, 1)))
LOGGER.info(f'Pulling data from {start_date} to {end_date}.')
with eco_postgres.get_conn() as conn:
for site_id in getKnownSites(conn):
LOGGER.debug(f'Starting on site {site_id}.')
for flow_id in getKnownFlows(conn, site_id):
LOGGER.debug(f'Starting on flow {flow_id} for site {site_id}.')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
LOGGER.debug(f'Fetching data for flow {flow_id}.')
counts = getFlowData(token, flow_id, start_date, end_date)
#convert response into a tuple for inserting
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
if len(volume) == 0:
LOGGER.info(f'{len(volume)} rows fetched for flow {flow_id} of site {site_id}.')
insertFlowCounts(conn, volume)
LOGGER.info(f'Data inserted for site {site_id}.')
truncate_and_insert(conn, token, flow_id, start_date, end_date)

@task(trigger_rule='none_failed')
def pull_recent_outages():
eco_postgres, token = get_connections()
#get list of outages
outage_query = "SELECT flow_id, date_start, date_end FROM ecocounter.recent_outages;"
with eco_postgres.get_conn() as conn, conn.cursor() as curr:
curr.execute(outage_query)
recent_outages = curr.fetchall()
#for each outage, try to pull data
with eco_postgres.get_conn() as conn:
for outage in recent_outages:
flow_id, start_date, end_date = outage
truncate_and_insert(conn, token, flow_id, start_date, end_date)

t_done = ExternalTaskMarker(
task_id="done",
external_dag_id="ecocounter_check",
Expand Down Expand Up @@ -219,6 +213,7 @@ def data_checks():
]

(
pull_recent_outages(),
check_partitions() >>
update_sites_and_flows() >>
pull_ecocounter() >>
Expand Down
31 changes: 18 additions & 13 deletions volumes/ecocounter/pull_data_from_api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import requests
import logging
from configparser import ConfigParser
from psycopg2 import connect
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
from airflow.exceptions import AirflowFailException

LOGGER = logging.getLogger(__name__)

default_start = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)-timedelta(days=1)
default_end = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)

Expand Down Expand Up @@ -148,6 +151,20 @@ def insertFlow(conn: any, flow_id: int, site_id: int, flow_name: str, bin_size:
with conn.cursor() as cur:
cur.execute(insert_query, (flow_id, site_id, flow_name, bin_size))

def truncate_and_insert(conn, token, flow_id, start_date, end_date):
LOGGER.info(f'Attempting to fetch data for flow {flow_id} from {start_date} to {end_date}.')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
counts = getFlowData(token, flow_id, start_date, end_date)
#convert response into a tuple for inserting
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
LOGGER.info(f'{len(volume)} rows fetched for flow {flow_id} from {start_date} to {end_date}.')
insertFlowCounts(conn, volume)

#for testing/pulling data without use of airflow.
def run_api(
start_date: datetime = default_start,
Expand Down Expand Up @@ -177,16 +194,4 @@ def run_api(
if not flowIsKnownToUs(flow_id, conn):
print('unknown flow', flow_id)
continue
# we do have this site and flow in the database; let's update its counts
print(f'starting on flow {flow_id}')
# empty the count table for this flow
truncateFlowSince(flow_id, conn, start_date, end_date)
# and fill it back up!
print(f'fetching data for flow {flow_id}')
counts = getFlowData(token, flow_id, start_date, end_date)
print(f'inserting data for flow {flow_id}')
volume=[]
for count in counts:
row=(flow_id, count['date'], count['counts'])
volume.append(row)
insertFlowCounts(conn, volume)
truncate_and_insert(conn, token, flow_id, start_date, end_date)
1 change: 1 addition & 0 deletions volumes/ecocounter/tables/flows_unfiltered.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE ecocounter.flows_unfiltered (
validated boolean,
first_active timestamp without time zone,
last_active timestamp without time zone,
date_decommissioned timestamp without time zone,
CONSTRAINT locations_pkey PRIMARY KEY (flow_id),
CONSTRAINT flows_replaced_by_flow_id_fkey FOREIGN KEY (replaced_by_flow_id)
REFERENCES ecocounter.flows_unfiltered (flow_id) MATCH SIMPLE
Expand Down
3 changes: 2 additions & 1 deletion volumes/ecocounter/views/create-view-flows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ CREATE OR REPLACE VIEW ecocounter.flows AS (
replaces_flow_id,
includes_contraflow,
first_active,
last_active
last_active,
date_decommissioned
FROM ecocounter.flows_unfiltered
WHERE validated
);
Expand Down
73 changes: 73 additions & 0 deletions volumes/ecocounter/views/create-view-recent-outages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
CREATE OR REPLACE VIEW ecocounter.recent_outages AS (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename to sth like last_week_outages or sth similar?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

month**

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even more generic: identify_outages


WITH ongoing_outages AS (
SELECT
f.flow_id,
f.site_id,
dates.dt,
dates.dt - lag(dates.dt) OVER w = interval '1 day' AS consecutive
FROM ecocounter.flows_unfiltered AS f
CROSS JOIN
generate_series(
now()::date - interval '31 days',
now()::date - interval '2 day',
interval '1 day'
) AS dates (dt)
LEFT JOIN ecocounter.counts_unfiltered AS c
ON c.flow_id = f.flow_id
AND c.datetime_bin >= dates.dt
AND c.datetime_bin < dates.dt + interval '1 day'
--select counts partitions
AND c.datetime_bin >= now()::date - interval '31 days'
AND c.datetime_bin < now()::date - interval '1 day'
WHERE
f.validated
AND dates.dt < COALESCE(f.date_decommissioned, now()::date - interval '1 day')
GROUP BY
f.flow_id,
f.site_id,
f.validated,
f.last_active,
f.date_decommissioned,
dates.dt
HAVING
COALESCE(SUM(c.volume), 0) = 0 --null or zero
WINDOW w AS (PARTITION BY f.flow_id ORDER BY dates.dt)
ORDER BY
f.flow_id,
dates.dt
),

group_ids AS (
SELECT
flow_id,
site_id,
dt,
SUM(CASE WHEN consecutive IS TRUE THEN 0 ELSE 1 END) OVER w AS group_id
FROM ongoing_outages
WINDOW w AS (PARTITION BY flow_id ORDER BY dt)
)

SELECT
flow_id,
site_id,
MIN(dt) AS date_start,
gabrielwol marked this conversation as resolved.
Show resolved Hide resolved
MAX(dt) + interval '1 day' AS date_end
FROM group_ids
GROUP BY
flow_id,
site_id,
group_id
);

ALTER VIEW ecocounter.recent_outages OWNER TO ecocounter_admins;
GRANT ALL ON TABLE ecocounter.recent_outages TO ecocounter_admins;

REVOKE ALL ON TABLE ecocounter.recent_outages FROM bdit_humans;
GRANT SELECT ON TABLE ecocounter.recent_outages TO bdit_humans;

GRANT SELECT ON TABLE ecocounter.recent_outages TO ecocounter_bot;

COMMENT ON VIEW ecocounter.recent_outages
IS 'A view to identify recent outages in Ecocounter data and group
them into runs for ease of pulling.';