Skip to content

Commit

Permalink
Update prometheus_exporter.py
Browse files Browse the repository at this point in the history
remove dag_duration temporarily. see robinhood#26
  • Loading branch information
qiuwei authored Jul 21, 2020
1 parent 078a151 commit 648d109
Showing 1 changed file with 0 additions and 76 deletions.
76 changes: 0 additions & 76 deletions airflow_prometheus_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,71 +62,6 @@ def get_dag_state_info():
)


def get_dag_duration_info():
"""Duration of successful DAG Runs."""
with session_scope(Session) as session:
max_execution_dt_query = (
session.query(
DagRun.dag_id,
func.max(DagRun.execution_date).label("max_execution_dt"),
)
.join(DagModel, DagModel.dag_id == DagRun.dag_id)
.filter(
DagModel.is_active == True, # noqa
DagModel.is_paused == False,
DagRun.state == State.SUCCESS,
DagRun.end_date.isnot(None),
)
.group_by(DagRun.dag_id)
.subquery()
)

dag_start_dt_query = (
session.query(
max_execution_dt_query.c.dag_id,
max_execution_dt_query.c.max_execution_dt.label(
"execution_date"
),
func.min(TaskInstance.start_date).label("start_date"),
)
.join(
TaskInstance,
and_(
TaskInstance.dag_id == max_execution_dt_query.c.dag_id,
(
TaskInstance.execution_date
== max_execution_dt_query.c.max_execution_dt
),
),
)
.group_by(
max_execution_dt_query.c.dag_id,
max_execution_dt_query.c.max_execution_dt,
)
.subquery()
)

return (
session.query(
dag_start_dt_query.c.dag_id,
dag_start_dt_query.c.start_date,
DagRun.end_date,
)
.join(
DagRun,
and_(
DagRun.dag_id == dag_start_dt_query.c.dag_id,
DagRun.execution_date
== dag_start_dt_query.c.execution_date,
),
)
.filter(
TaskInstance.start_date.isnot(None),
TaskInstance.end_date.isnot(None),
)
.all()
)


######################
# Task Related Metrics
Expand Down Expand Up @@ -406,17 +341,6 @@ def collect(self):
d_state.add_metric([dag.dag_id, dag.owners, dag.state], dag.count)
yield d_state

dag_duration = GaugeMetricFamily(
"airflow_dag_run_duration",
"Duration of successful dag_runs in seconds",
labels=["dag_id"],
)
for dag in get_dag_duration_info():
dag_duration_value = (
dag.end_date - dag.start_date
).total_seconds()
dag_duration.add_metric([dag.dag_id], dag_duration_value)
yield dag_duration

# Scheduler Metrics
dag_scheduler_delay = GaugeMetricFamily(
Expand Down

0 comments on commit 648d109

Please sign in to comment.