From dc9b8c3b42ad625058624184fa4ed77fc0184737 Mon Sep 17 00:00:00 2001 From: Hasan Ozturk Date: Mon, 29 Jan 2024 17:07:37 +0100 Subject: [PATCH] CMS: modernize rule count probe --- cms/check_rule_counts | 332 ++++++++++-------------------------------- 1 file changed, 75 insertions(+), 257 deletions(-) diff --git a/cms/check_rule_counts b/cms/check_rule_counts index 78949e2..69053ae 100755 --- a/cms/check_rule_counts +++ b/cms/check_rule_counts @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2012-2020 CERN +# Copyright 2012-2024 CERN # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ # - Donata Mielaikaite, , 2020 # - Eric Vaandering, , 2021 # - Fernando Garzon, ogarzonm@cern.ch, 2022 +# - Hasan Ozturk, haozturk AT cern DOT ch, 2024 """ @@ -38,273 +39,90 @@ from sqlalchemy import func from utils import common +PrometheusPusher = common.PrometheusPusher probe_metrics = common.probe_metrics -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') -else: - PROM_SERVERS = None - -prom_labels_config = config_get('monitor', 'prometheus_labels', raise_exception=False, default='{}') -extra_prom_labels = json.loads(prom_labels_config) - # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 if __name__ == '__main__': - registry = CollectorRegistry() - - rule_count_labels = ['state'] - not_ok_labels = ['Not_ok_rules'] - stuck_cnt_labels = ['Stuck_cnt'] - replicating_cnt_labels = ['Replicating_cnt'] - rules_by_activity_labels = ['state', 'activity'] - locks_by_activity_labels = ['state', 'activity'] - - rule_count_labels.extend(extra_prom_labels.keys()) - not_ok_labels.extend(extra_prom_labels.keys()) - stuck_cnt_labels.extend(extra_prom_labels.keys()) - replicating_cnt_labels.extend(extra_prom_labels.keys()) - rules_by_activity_labels.extend(extra_prom_labels.keys()) - locks_by_activity_labels.extend(extra_prom_labels.keys()) - - rules_count_gauge = Gauge('rucio_rules_count', 'Number of rules in a given state', - labelnames=rule_count_labels, registry=registry) - not_ok_rules_gauge = Gauge('rucio_not_ok_rules', 'Number of not OK rules', - labelnames=not_ok_labels, registry=registry) - stuck_cnt_gauge = Gauge('rucio_stuck_cnt', 'Number of stuck files', - labelnames=stuck_cnt_labels, registry=registry) - replicating_cnt_gauge = Gauge('rucio_replicating_cnt', 'Number of replicating files', - labelnames=replicating_cnt_labels, registry=registry) - rule_count_by_activity_gauge = Gauge('rucio_rules_states_by_ativity_cnt', 'Number of S/R/U rules by activity', - labelnames=rules_by_activity_labels, registry=registry) - locks_count_by_activity_gauge = Gauge('rucio_locks_states_by_ativity_cnt', 'Number of S/R locks by activity', - labelnames=locks_by_activity_labels, registry=registry) try: session = get_session() - # check rules - state_map = {'REPLICATING': 'rules_replicating', - 'OK': 'rules_ok', - 'INJECT': 'rules_injecting', - 'STUCK': 'rules_stuck', - 'SUSPENDED': 'rules_suspend', - 'WAITING_APPROVAL': 'rules_waiting_approval', } - - ages = { - 'created_24hours_ago': datetime.timedelta(days=1), - 'created_1week_ago': datetime.timedelta(days=7), - 'created_3weeks_ago': datetime.timedelta(days=21), - 'created_3months_ago': datetime.timedelta(days=90), - 'created_6months_ago': datetime.timedelta(days=180), - 'created_12months_ago': datetime.timedelta(days=365), - } - - result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state)) - .group_by(models.ReplicationRule.state) - .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') - .all()) - - for state, num in result: - # Count Rules in x state - gauge_state = state_map.get(str(state.name), 'rules_' + str(state.name).lower()) - print('rules.count.%s %s' % (gauge_state, num)) - - prom_labels = {'state': gauge_state} - prom_labels.update(extra_prom_labels) - rules_count_gauge.labels(**prom_labels).set(num) - - probe_metrics.gauge(name='rules.count.{state}', - documentation='Number of rules in a given state').labels(state=gauge_state).set(num) - - # Count Rules in x state ordered by activity - results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state)) - .filter(models.ReplicationRule.state != RuleState.OK) - .group_by(models.ReplicationRule.activity, models.ReplicationRule.state) - .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') - .all()) - - for result in results: - print(result[0], result[1], result[2]) - prom_labels = {'state': result[0], 'activity': result[1]} - prom_labels.update(extra_prom_labels) - rule_count_by_activity_gauge.labels(**prom_labels).set(result[2]) - - # Count Locks in S/R state ordered by activity - print('Count Locks in S/R state ordered by activity') - results = (session.query(models.ReplicationRule.activity, - func.sum(models.ReplicationRule.locks_stuck_cnt), - func.sum(models.ReplicationRule.locks_replicating_cnt)) - .group_by(models.ReplicationRule.activity) - .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') - .all()) - - for result in results: - prom_labels = {'state': 'stuck', 'activity': result[0]} - prom_labels.update(extra_prom_labels) - locks_count_by_activity_gauge.labels(**prom_labels).set(result[1]) - prom_labels = {'state': 'replicating', 'activity': result[0]} - prom_labels.update(extra_prom_labels) - locks_count_by_activity_gauge.labels(**prom_labels).set(result[2]) - - # Not Ok rules - query = session.query(models.ReplicationRule.scope).filter(models.ReplicationRule.state != RuleState.OK) - result = get_count(query) - - prom_labels = {'Not_ok_rules': 'Not_ok_rules'} - prom_labels.update(extra_prom_labels) - not_ok_rules_gauge.labels(**prom_labels).set(result) - - probe_metrics.gauge(name='judge.total_not_OK_rules', documentation='Number of not OK rules').set(result) - - # Stuck cnt - query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) - .filter(models.ReplicationRule.state == RuleState.STUCK)) - result = query.scalar() or 0 - - print('rules.no_of_files.total.sum_locks_stuck_cnt %s' % (result)) - - prom_labels = {'Stuck_cnt': 'Stuck_cnt'} - prom_labels.update(extra_prom_labels) - stuck_cnt_gauge.labels(**prom_labels).set(result) - - probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_stuck_cnt', - documentation='Number of stuck files').set(result) - - # check left replicating files - query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt)) - .filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING]))) - result = query.scalar() or 0 - - print('rules.no_of_files.total.sum_locks_replicating_cnt %s' % (result)) - - prom_labels = {'Replicating_cnt': 'Replicating_cnt'} - prom_labels.update(extra_prom_labels) - replicating_cnt_gauge.labels(**prom_labels).set(result) - - probe_metrics.gauge(name='rules.no_of_files.total.sum_locks_replicating_cnt', - documentation='Number of replicating files').set(result) - - # check stuck and replicating files which are more than X old - suspended_rules_older_than_X_labels = ['age'] - stuck_rules_older_than_X_labels = ['age'] - replicating_rules_older_than_X_labels = ['age'] - - suspended_rules_older_than_X_labels.extend(extra_prom_labels.keys()) - stuck_rules_older_than_X_labels.extend(extra_prom_labels.keys()) - replicating_rules_older_than_X_labels.extend(extra_prom_labels.keys()) - - stuck_locks_older_than_X_labels = ['age'] - replicating_locks_older_than_X_labels = ['age'] - - stuck_locks_older_than_X_labels.extend(extra_prom_labels.keys()) - replicating_locks_older_than_X_labels.extend(extra_prom_labels.keys()) - - suspended_rules_older_than_gauge = Gauge('rucio_suspended_rules_cnt', - 'Number of suspended rules older than X', - labelnames=suspended_rules_older_than_X_labels, - registry=registry) - print('suspended_rules_older_than_X_labels', suspended_rules_older_than_X_labels) - stuck_rules_older_than_gauge = Gauge('rucio_stuck_rules_cnt', 'Number of stuck rules older than X', - labelnames=stuck_rules_older_than_X_labels, - registry=registry) - - replicating_rules_older_than_gauge = Gauge('rucio_replicating_rules_cnt', - 'Number of replicating rules older than X', - labelnames=replicating_rules_older_than_X_labels, - registry=registry) - - stuck_locks_older_than_gauge = Gauge('rucio_stuck_locks_cnt', 'Number of stuck files older than X', - labelnames=stuck_locks_older_than_X_labels, - registry=registry) - replicating_locks_older_than_gauge = Gauge('rucio_replicating_locks_cnt', - 'Number of replicating files older than X', - labelnames=replicating_locks_older_than_X_labels, - registry=registry) - - for a_name, a_delta in ages.items(): - timeLimit = datetime.datetime.utcnow() - a_delta - - # Number of Suspended rules older than x - - query = (session.query(func.count(models.ReplicationRule.id)) - .filter(models.ReplicationRule.state == RuleState.SUSPENDED) - .filter(models.ReplicationRule.created_at <= timeLimit)) - result = query.scalar() or 0 - print('rules.no_of_rules.suspended.%s.suspended_rules_cnt %s' % (a_name, result)) - - prom_labels = {'age': a_name.split('_')[1]} - prom_labels.update(extra_prom_labels) - - suspended_rules_older_than_gauge.labels(**prom_labels).set(result) - print(prom_labels, result) - - probe_metrics.gauge(name='rules.no_of_rules.suspended.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) - - # Number of Stuck rules older than x - - query = (session.query(func.count(models.ReplicationRule.id)) - .filter(models.ReplicationRule.state == RuleState.STUCK) - .filter(models.ReplicationRule.created_at <= timeLimit)) - result = query.scalar() or 0 - print('rules.no_of_rules.stuck.%s.stuck_rules_cnt %s' % (a_name, result)) - - stuck_rules_older_than_gauge.labels(**prom_labels).set(result) - print(prom_labels, result) - - probe_metrics.gauge(name='rules.no_of_rules.stuck.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) - - # Number of replicating rules older than x - - query = (session.query(func.count(models.ReplicationRule.id)) - .filter(models.ReplicationRule.state == RuleState.REPLICATING) - .filter(models.ReplicationRule.created_at <= timeLimit)) - result = query.scalar() or 0 - print('rules.no_of_rules.replicating.%s.replicating_rules_cnt %s' % (a_name, result)) - - replicating_rules_older_than_gauge.labels(**prom_labels).set(result) - print(prom_labels, result) - - probe_metrics.gauge(name='rules.no_of_rules.replicating.{name}.sum_locks_stuck_cnt').labels(name=a_name).set(result) - - # Number of Stuck files - - query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) - .filter(models.ReplicationRule.state == RuleState.STUCK) - .filter(models.ReplicationRule.created_at <= timeLimit)) - result = query.scalar() or 0 - print('rules.no_of_files.stuck.%s.sum_locks_stuck_cnt %s' % (a_name, result)) - - stuck_locks_older_than_gauge.labels(**prom_labels).set(result) - print(prom_labels, result) - - probe_metrics.gauge(name='rules.no_of_files.stuck.{name}.sum_locks_stuck_cnt', - documentation='Number of stuck files older than X').labels(name=a_name).set(result) - - # Number of Replicating files - - query = (session.query(func.sum(models.ReplicationRule.locks_replicating_cnt)) - .filter(models.ReplicationRule.state.in_([RuleState.STUCK, RuleState.REPLICATING])) - .filter(models.ReplicationRule.created_at <= timeLimit)) - result = query.scalar() or 0 - print('rules.no_of_files.replicating.%s.sum_locks_replicating_cnt %s' % (a_name, result)) - - replicating_locks_older_than_gauge.labels(**prom_labels).set(result) - print(prom_labels, result) - - (probe_metrics.gauge(name='rules.no_of_files.replicating.{name}.sum_locks_replicating_cnt', - documentation='Number of replicating files older than X') - .labels(name=a_name) - .set(result)) + states = {'REPLICATING': RuleState.REPLICATING, + 'OK': RuleState.OK, + 'INJECT': RuleState.INJECT, + 'STUCK': RuleState.STUCK, + 'SUSPENDED': RuleState.SUSPENDED, + 'WAITING_APPROVAL': RuleState.WAITING_APPROVAL } + + # Number of days which will be used to aggregate rules/locks + older_than_n_days = [1, 7, 21, 90, 180, 365] + + with PrometheusPusher() as manager: + + # Rule count per state + result = (session.query(models.ReplicationRule.state, func.count(models.ReplicationRule.state)) + .group_by(models.ReplicationRule.state) + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') + .all()) + + for state, num in result: + manager.gauge(name='rule_count_per_state.{state}', + documentation='Number of rules in a given state').labels(state=str(state.name)).set(num) + + + # Rule count per state and activity + results = (session.query(models.ReplicationRule.state, models.ReplicationRule.activity, func.count(models.ReplicationRule.state)) + .filter(models.ReplicationRule.state != RuleState.OK) + .group_by(models.ReplicationRule.activity, models.ReplicationRule.state) + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') + .all()) + + for result in results: + manager.gauge(name='rule_count_per_state_and_activity.{state}.{activity}', + documentation='Number of rules in a given state and activity').labels(state=result[0], activity=result[1]).set(result[2]) + + + # Lock count per state (STUCK and REPLICATING) and activity + results = (session.query(models.ReplicationRule.activity, + func.sum(models.ReplicationRule.locks_stuck_cnt), + func.sum(models.ReplicationRule.locks_replicating_cnt)) + .group_by(models.ReplicationRule.activity) + .with_hint(models.ReplicationRule, 'INDEX_FFS(rules RULES_PK)', 'oracle') + .all()) + + for result in results: + manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}', + documentation='Number of S/R locks by activity and state').labels(state='stuck', activity=result[0]).set(result[1]) + manager.gauge(name='lock_count_per_state_and_activity.{state}.{activity}', + documentation='Number of S/R locks by activity and state').labels(state='replicating', activity=result[0]).set(result[2]) + + for nDays in older_than_n_days: + age = datetime.datetime.utcnow() - datetime.timedelta(days=nDays) + + for stateName, stateDB in states.items(): + + # Rule count per state and date + query = (session.query(func.count(models.ReplicationRule.id)) + .filter(models.ReplicationRule.state == stateDB) + .filter(models.ReplicationRule.created_at <= age)) + result = query.scalar() or 0 + + manager.gauge(name='rule_count_per_state_and_date.{state}.{older_than_days}', + documentation='Rule count per state and date').labels(state=stateName, older_than_days= nDays ).set(result) + + + # File count per state and date + query = (session.query(func.sum(models.ReplicationRule.locks_stuck_cnt)) + .filter(models.ReplicationRule.state == stateDB) + .filter(models.ReplicationRule.created_at <= age)) + result = query.scalar() or 0 + + manager.gauge(name='file_count_per_state_and_date.{state}.{older_than_days}', + documentation='File count per state and date').labels(state=stateName, older_than_days=nDays).set(result) - if PROM_SERVERS: - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_rules_count_by_state_by_account', registry=registry) - except: - continue except: print(traceback.format_exc()) sys.exit(UNKNOWN)