-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Hasan Ozturk
committed
Jan 29, 2024
1 parent
d5f9921
commit dc9b8c3
Showing
1 changed file
with
75 additions
and
257 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
#!/usr/bin/env python3 | ||
# Copyright 2012-2020 CERN | ||
# Copyright 2012-2024 CERN | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
|
@@ -17,6 +17,7 @@ | |
# - Donata Mielaikaite, <[email protected]>, 2020 | ||
# - Eric Vaandering, <[email protected]>, 2021 | ||
# - Fernando Garzon, [email protected], 2022 | ||
# - Hasan Ozturk, haozturk AT cern DOT ch, 2024 | ||
|
||
|
||
""" | ||
|
@@ -38,273 +39,90 @@ from sqlalchemy import func | |
|
||
from utils import common | ||
|
||
PrometheusPusher = common.PrometheusPusher | ||
probe_metrics = common.probe_metrics | ||
|
||
PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') | ||
if PROM_SERVERS != '': | ||
PROM_SERVERS = PROM_SERVERS.split(',') | ||
else: | ||
PROM_SERVERS = None | ||
|
||
prom_labels_config = config_get('monitor', 'prometheus_labels', raise_exception=False, default='{}') | ||
extra_prom_labels = json.loads(prom_labels_config) | ||
|
||
# Exit statuses | ||
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 | ||
|
||
if __name__ == '__main__': | ||
registry = CollectorRegistry() | ||
|
||
rule_count_labels = ['state'] | ||
not_ok_labels = ['Not_ok_rules'] | ||
stuck_cnt_labels = ['Stuck_cnt'] | ||
replicating_cnt_labels = ['Replicating_cnt'] | ||
rules_by_activity_labels = ['state', 'activity'] | ||
locks_by_activity_labels = ['state', 'activity'] | ||
|
||
rule_count_labels.extend(extra_prom_labels.keys()) | ||
not_ok_labels.extend(extra_prom_labels.keys()) | ||
stuck_cnt_labels.extend(extra_prom_labels.keys()) | ||
replicating_cnt_labels.extend(extra_prom_labels.keys()) | ||
rules_by_activity_labels.extend(extra_prom_labels.keys()) | ||
locks_by_activity_labels.extend(extra_prom_labels.keys()) | ||
|
||
rules_count_gauge = Gauge('rucio_rules_count', 'Number of rules in a given state', | ||
labelnames=rule_count_labels, registry=registry) | ||
not_ok_rules_gauge = Gauge('rucio_not_ok_rules', 'Number of not OK rules', | ||
labelnames=not_ok_labels, registry=registry) | ||
stuck_cnt_gauge = Gauge('rucio_stuck_cnt', 'Number of stuck files', | ||
labelnames=stuck_cnt_labels, registry=registry) | ||
replicating_cnt_gauge = Gauge('rucio_replicating_cnt', 'Number of replicating files', | ||
labelnames=replicating_cnt_labels, registry=registry) | ||
rule_count_by_activity_gauge = Gauge('rucio_rules_states_by_ativity_cnt', 'Number of S/R/U rules by activity', | ||
labelnames=rules_by_activity_labels, registry=registry) | ||
locks_count_by_activity_gauge = Gauge('rucio_locks_states_by_ativity_cnt', 'Number of S/R locks by activity', | ||
labelnames=locks_by_activity_labels, registry=registry) | ||
|
||
try: | ||
session = get_session() | ||
|
||
# check rules | ||
state_map = {'REPLICATING': 'rules_replicating', | ||
'OK': 'rules_ok', | ||
'INJECT': 'rules_injecting', | ||
'STUCK': 'rules_stuck', | ||
'SUSPENDED': 'rules_suspend', | ||
'WAITING_APPROVAL': 'rules_waiting_approval', } | ||
|
||
ages = { | ||
'created_24hours_ago': datetime.timedelta(days=1), | ||
'created_1week_ago': datetime.timedelta(days=7), | ||
'created_3weeks_ago': datetime.timedelta(days=21), | ||
'created_3months_ago': datetime.timedelta(days=90), | ||
'created_6months_ago': datetime.timedelta(days=180), | ||
'created_12months_ago': datetime.timedelta(days=365), | ||
} | ||
|
||
result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state)) | ||
.group_by(models.ReplicationRule.state) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for state, num in result: | ||
# Count Rules in x state | ||
gauge_state = state_map.get(str(state.name), 'rules_' + str(state.name).lower()) | ||
print('rules.count.%s %s' % (gauge_state, num)) | ||
|
||
prom_labels = {'state': gauge_state} | ||
prom_labels.update(extra_prom_labels) | ||
rules_count_gauge.labels(**prom_labels).set(num) | ||
|
||
probe_metrics.gauge(name='rules.count.{state}', | ||
documentation='Number of rules in a given state').labels(state=gauge_state).set(num) | ||
|
||
# Count Rules in x state ordered by activity | ||
results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state)) | ||
.filter(models.ReplicationRule.state != RuleState.OK) | ||
.group_by(models.ReplicationRule.activity, models.ReplicationRule.state) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for result in results: | ||
print(result[0], result[1], result[2]) | ||
prom_labels = {'state': result[0], 'activity': result[1]} | ||
prom_labels.update(extra_prom_labels) | ||
rule_count_by_activity_gauge.labels(**prom_labels).set(result[2]) | ||
|
||
# Count Locks in S/R state ordered by activity | ||
print('Count Locks in S/R state ordered by activity') | ||
results = (session.query(models.ReplicationRule.activity, | ||
func.sum(models.ReplicationRule.locks_stuck_cnt), | ||
func.sum(models.ReplicationRule.locks_replicating_cnt)) | ||
.group_by(models.ReplicationRule.activity) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for result in results: | ||
prom_labels = {'state': 'stuck', 'activity': result[0]} | ||
prom_labels.update(extra_prom_labels) | ||
locks_count_by_activity_gauge.labels(**prom_labels).set(result[1]) | ||
prom_labels = {'state': 'replicating', 'activity': result[0]} | ||
prom_labels.update(extra_prom_labels) | ||
locks_count_by_activity_gauge.labels(**prom_labels).set(result[2]) | ||
|
||
# Not Ok rules | ||
query = session.query(models.ReplicationRule.scope).filter(models.ReplicationRule.state != RuleState.OK) | ||
result = get_count(query) | ||
|
||
prom_labels = {'Not_ok_rules': 'Not_ok_rules'} | ||
prom_labels.update(extra_prom_labels) | ||
not_ok_rules_gauge.labels(**prom_labels).set(result) | ||
|
||
probe_metrics.gauge(name='judge.total_not_OK_rules', documentation='Number of not OK rules').set(result) | ||
|
||
# Stuck cnt | ||
query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) | ||
.filter(models.ReplicationRule.state == RuleState.STUCK)) | ||
result = query.scalar() or 0 | ||
|
||
print('rules.no_of_files.total.sum_locks_stuck_cnt %s' % (result)) | ||
|
||
prom_labels = {'Stuck_cnt': 'Stuck_cnt'} | ||
prom_labels.update(extra_prom_labels) | ||
stuck_cnt_gauge.labels(**prom_labels).set(result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_stuck_cnt', | ||
documentation='Number of stuck files').set(result) | ||
|
||
# check left replicating files | ||
query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt)) | ||
.filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING]))) | ||
result = query.scalar() or 0 | ||
|
||
print('rules.no_of_files.total.sum_locks_replicating_cnt %s' % (result)) | ||
|
||
prom_labels = {'Replicating_cnt': 'Replicating_cnt'} | ||
prom_labels.update(extra_prom_labels) | ||
replicating_cnt_gauge.labels(**prom_labels).set(result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_replicating_cnt', | ||
documentation='Number of replicating files').set(result) | ||
|
||
# check stuck and replicating files which are more than X old | ||
suspended_rules_older_than_X_labels = ['age'] | ||
stuck_rules_older_than_X_labels = ['age'] | ||
replicating_rules_older_than_X_labels = ['age'] | ||
|
||
suspended_rules_older_than_X_labels.extend(extra_prom_labels.keys()) | ||
stuck_rules_older_than_X_labels.extend(extra_prom_labels.keys()) | ||
replicating_rules_older_than_X_labels.extend(extra_prom_labels.keys()) | ||
|
||
stuck_locks_older_than_X_labels = ['age'] | ||
replicating_locks_older_than_X_labels = ['age'] | ||
|
||
stuck_locks_older_than_X_labels.extend(extra_prom_labels.keys()) | ||
replicating_locks_older_than_X_labels.extend(extra_prom_labels.keys()) | ||
|
||
suspended_rules_older_than_gauge = Gauge('rucio_suspended_rules_cnt', | ||
'Number of suspended rules older than X', | ||
labelnames=suspended_rules_older_than_X_labels, | ||
registry=registry) | ||
print('suspended_rules_older_than_X_labels', suspended_rules_older_than_X_labels) | ||
stuck_rules_older_than_gauge = Gauge('rucio_stuck_rules_cnt', 'Number of stuck rules older than X', | ||
labelnames=stuck_rules_older_than_X_labels, | ||
registry=registry) | ||
|
||
replicating_rules_older_than_gauge = Gauge('rucio_replicating_rules_cnt', | ||
'Number of replicating rules older than X', | ||
labelnames=replicating_rules_older_than_X_labels, | ||
registry=registry) | ||
|
||
stuck_locks_older_than_gauge = Gauge('rucio_stuck_locks_cnt', 'Number of stuck files older than X', | ||
labelnames=stuck_locks_older_than_X_labels, | ||
registry=registry) | ||
replicating_locks_older_than_gauge = Gauge('rucio_replicating_locks_cnt', | ||
'Number of replicating files older than X', | ||
labelnames=replicating_locks_older_than_X_labels, | ||
registry=registry) | ||
|
||
for a_name, a_delta in ages.items(): | ||
timeLimit = datetime.datetime.utcnow() - a_delta | ||
|
||
# Number of Suspended rules older than x | ||
|
||
query = (session.query(func.count(models.ReplicationRule.id)) | ||
.filter(models.ReplicationRule.state == RuleState.SUSPENDED) | ||
.filter(models.ReplicationRule.created_at <= timeLimit)) | ||
result = query.scalar() or 0 | ||
print('rules.no_of_rules.suspended.%s.suspended_rules_cnt %s' % (a_name, result)) | ||
|
||
prom_labels = {'age': a_name.split('_')[1]} | ||
prom_labels.update(extra_prom_labels) | ||
|
||
suspended_rules_older_than_gauge.labels(**prom_labels).set(result) | ||
print(prom_labels, result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_rules.suspended.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) | ||
|
||
# Number of Stuck rules older than x | ||
|
||
query = (session.query(func.count(models.ReplicationRule.id)) | ||
.filter(models.ReplicationRule.state == RuleState.STUCK) | ||
.filter(models.ReplicationRule.created_at <= timeLimit)) | ||
result = query.scalar() or 0 | ||
print('rules.no_of_rules.stuck.%s.stuck_rules_cnt %s' % (a_name, result)) | ||
|
||
stuck_rules_older_than_gauge.labels(**prom_labels).set(result) | ||
print(prom_labels, result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_rules.stuck.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) | ||
|
||
# Number of replicating rules older than x | ||
|
||
query = (session.query(func.count(models.ReplicationRule.id)) | ||
.filter(models.ReplicationRule.state == RuleState.REPLICATING) | ||
.filter(models.ReplicationRule.created_at <= timeLimit)) | ||
result = query.scalar() or 0 | ||
print('rules.no_of_rules.replicating.%s.replicating_rules_cnt %s' % (a_name, result)) | ||
|
||
replicating_rules_older_than_gauge.labels(**prom_labels).set(result) | ||
print(prom_labels, result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_rules.replicating.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) | ||
|
||
# Number of Stuck files | ||
|
||
query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) | ||
.filter(models.ReplicationRule.state == RuleState.STUCK) | ||
.filter(models.ReplicationRule.created_at <= timeLimit)) | ||
result = query.scalar() or 0 | ||
print('rules.no_of_files.stuck.%s.sum_locks_stuck_cnt %s' % (a_name, result)) | ||
|
||
stuck_locks_older_than_gauge.labels(**prom_labels).set(result) | ||
print(prom_labels, result) | ||
|
||
probe_metrics.gauge(name='rules.no_of_files.stuck.{name}.sum_locks_stuck_cnt', | ||
documentation='Number of stuck files older than X').labels(name=a_name).set(result) | ||
|
||
# Number of Replicating files | ||
|
||
query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt)) | ||
.filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING])) | ||
.filter(models.ReplicationRule.created_at <= timeLimit)) | ||
result = query.scalar() or 0 | ||
print('rules.no_of_files.replicating.%s.sum_locks_replicating_cnt %s' % (a_name, result)) | ||
|
||
replicating_locks_older_than_gauge.labels(**prom_labels).set(result) | ||
print(prom_labels, result) | ||
|
||
(probe_metrics.gauge(name='rules.no_of_files.replicating.{name}.sum_locks_replicating_cnt', | ||
documentation='Number of replicating files older than X') | ||
.labels(name=a_name) | ||
.set(result)) | ||
states = {'REPLICATING': RuleState.REPLICATING, | ||
'OK': RuleState.OK, | ||
'INJECT': RuleState.INJECT, | ||
'STUCK': RuleState.STUCK, | ||
'SUSPENDED': RuleState.SUSPENDED, | ||
'WAITING_APPROVAL': RuleState.WAITING_APPROVAL } | ||
|
||
# Number of days which will be used to aggregate rules/locks | ||
older_than_n_days = [1, 7, 21, 90, 180, 365] | ||
|
||
with PrometheusPusher() as manager: | ||
|
||
# Rule count per state | ||
result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state)) | ||
.group_by(models.ReplicationRule.state) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for state, num in result: | ||
manager.gauge(name='rule_count_per_state.{state}', | ||
documentation='Number of rules in a given state').labels(state=str(state.name)).set(num) | ||
|
||
|
||
# Rule count per state and activity | ||
results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state)) | ||
.filter(models.ReplicationRule.state != RuleState.OK) | ||
.group_by(models.ReplicationRule.activity, models.ReplicationRule.state) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for result in results: | ||
manager.gauge(name='rule_count_per_state_and_activity.{state}.{activity}', | ||
documentation='Number of rules in a given state and activity').labels(state=result[0], activity=result[1]).set(result[2]) | ||
|
||
|
||
# Lock count per state (STUCK and REPLICATING) and activity | ||
results = (session.query(models.ReplicationRule.activity, | ||
func.sum(models.ReplicationRule.locks_stuck_cnt), | ||
func.sum(models.ReplicationRule.locks_replicating_cnt)) | ||
.group_by(models.ReplicationRule.activity) | ||
.with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') | ||
.all()) | ||
|
||
for result in results: | ||
manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}', | ||
documentation='Number of S/R locks by activity and state').labels(state='stuck', activity=result[0]).set(result[1]) | ||
manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}', | ||
documentation='Number of S/R locks by activity and state').labels(state='replicating', activity=result[0]).set(result[2]) | ||
|
||
for nDays in older_than_n_days: | ||
age = datetime.datetime.utcnow() - datetime.timedelta(days=nDays) | ||
|
||
for stateName, stateDB in states.items(): | ||
|
||
# Rule count per state and date | ||
query = (session.query(func.count(models.ReplicationRule.id)) | ||
.filter(models.ReplicationRule.state == stateDB) | ||
.filter(models.ReplicationRule.created_at <= age)) | ||
result = query.scalar() or 0 | ||
|
||
manager.gauge(name='rule_count_per_state_and_date.{state}.{older_than_days}', | ||
documentation='Rule count per state and date').labels(state=stateName, older_than_days= nDays ).set(result) | ||
|
||
|
||
# File count per state and date | ||
query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) | ||
.filter(models.ReplicationRule.state == stateDB) | ||
.filter(models.ReplicationRule.created_at <= age)) | ||
result = query.scalar() or 0 | ||
|
||
manager.gauge(name='file_count_per_state_and_date.{state}.{older_than_days}', | ||
documentation='File count per state and date').labels(state=stateName, older_than_days=nDays).set(result) | ||
|
||
if PROM_SERVERS: | ||
for server in PROM_SERVERS: | ||
try: | ||
push_to_gateway(server.strip(), job='check_rules_count_by_state_by_account', registry=registry) | ||
except: | ||
continue | ||
except: | ||
print(traceback.format_exc()) | ||
sys.exit(UNKNOWN) | ||
|