Skip to content

Commit

Permalink
Merge pull request #4 from jvstein/missing_task_durations
Browse files Browse the repository at this point in the history
Include missing task duration when most recent DagRun fails
  • Loading branch information
Abhishek Ray authored Sep 20, 2019
2 parents e4fbc45 + e5653b9 commit fa449be
Showing 1 changed file with 12 additions and 36 deletions.
48 changes: 12 additions & 36 deletions airflow_prometheus_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,56 +168,30 @@ def get_task_duration_info():
DagRun.dag_id
).subquery()

task_duration_query = session.query(
return session.query(
TaskInstance.dag_id,
TaskInstance.task_id,
func.max(TaskInstance.execution_date).label('max_execution_dt')
).filter(
TaskInstance.state == State.SUCCESS,
TaskInstance.start_date.isnot(None),
TaskInstance.end_date.isnot(None),
).group_by(
TaskInstance.dag_id,
TaskInstance.task_id
).subquery()

task_latest_execution_dt = session.query(
task_duration_query.c.dag_id,
task_duration_query.c.task_id,
task_duration_query.c.max_execution_dt.label('execution_date'),
TaskInstance.start_date,
TaskInstance.end_date,
TaskInstance.execution_date
).join(
max_execution_dt_query,
and_(
(
task_duration_query.c.dag_id
TaskInstance.dag_id
==
max_execution_dt_query.c.dag_id
),
(
task_duration_query.c.max_execution_dt
==
max_execution_dt_query.c.max_execution_dt
),
)
).subquery()

return session.query(
task_latest_execution_dt.c.dag_id,
task_latest_execution_dt.c.task_id,
TaskInstance.start_date,
TaskInstance.end_date,
task_latest_execution_dt.c.execution_date,
).join(
TaskInstance,
and_(
TaskInstance.dag_id == task_latest_execution_dt.c.dag_id,
TaskInstance.task_id == task_latest_execution_dt.c.task_id,
(
TaskInstance.execution_date
==
task_latest_execution_dt.c.execution_date
max_execution_dt_query.c.max_execution_dt
),
)
).filter(
TaskInstance.state == State.SUCCESS,
TaskInstance.start_date.isnot(None),
TaskInstance.end_date.isnot(None)
).all()

######################
Expand Down Expand Up @@ -262,6 +236,8 @@ def get_task_scheduler_delay():
TaskInstance.queue == task_status_query.c.queue,
TaskInstance.start_date == task_status_query.c.max_start,
)
).filter(
TaskInstance.dag_id == CANARY_DAG, # Redundant, for performance.
).all()


Expand Down

0 comments on commit fa449be

Please sign in to comment.