From 3ed3fffaa974d8622c060c01de766e378afa47f1 Mon Sep 17 00:00:00 2001 From: Nicolai Buchwitz Date: Sun, 12 May 2024 12:51:17 +0200 Subject: [PATCH] style: Make linter happy Rework some code to make linter happy (esp. use f strings). Signed-off-by: Nicolai Buchwitz --- check_pve.py | 433 ++++++++++++++++++++++++++++----------------------- 1 file changed, 238 insertions(+), 195 deletions(-) diff --git a/check_pve.py b/check_pve.py index 98bf5e5..bdbb598 100755 --- a/check_pve.py +++ b/check_pve.py @@ -23,22 +23,51 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ------------------------------------------------------------------------------ -import sys +"""Proxmox VE monitoring check command for various monitoring systems like Icinga and others.""" + import re +import sys +from typing import Callable, Dict, Optional, Union try: - from enum import Enum - from datetime import datetime - from packaging import version import argparse + from datetime import datetime + from enum import Enum + import requests + from packaging import version + from requests.packages.urllib3.exceptions import InsecureRequestWarning except ImportError as e: - print("Missing python module: {}".format(str(e))) + print(f"Missing python module: {str(e)}") sys.exit(255) +# Timeout for API requests in seconds +CHECK_API_TIMEOUT = 30 + + +def compare_thresholds( + threshold_warning: Dict, threshold_critical: Dict, comparator: Callable +) -> bool: + """Perform sanity checks on thresholds parameters (used for argparse validation).""" + ok = True + keys = set(list(threshold_warning.keys()) + list(threshold_critical.keys())) + for key in keys: + if (key in threshold_warning and key in threshold_critical) or ( + None in threshold_warning and None in threshold_critical + ): + ok = ok and comparator(threshold_warning[key], threshold_critical[key]) + elif key in threshold_warning and None in threshold_critical: + ok = ok and comparator(threshold_warning[key], threshold_critical[None]) + elif key in threshold_critical and None in threshold_warning: + ok = ok and comparator(threshold_warning[None], threshold_critical[key]) + + return ok + class CheckState(Enum): + """Check return values.""" + OK = 0 WARNING = 1 CRITICAL = 2 @@ -46,49 +75,60 @@ class CheckState(Enum): class CheckThreshold: - def __init__(self, value: float): + """Threshold representation used by the check command.""" + + def __init__(self, value: float) -> None: self.value = value - def __eq__(self, other): + def __eq__(self, other: "CheckThreshold") -> bool: + """Threshold is equal to given one.""" return self.value == other.value - def __lt__(self, other): + def __lt__(self, other: "CheckThreshold") -> bool: + """Threshold is lower to given one.""" return self.value < other.value - def __le__(self, other): + def __le__(self, other: "CheckThreshold") -> bool: + """Threshold is lower or equal to given one.""" return self.value <= other.value - def __gt__(self, other): + def __gt__(self, other: "CheckThreshold") -> bool: + """Threshold is greater than given one.""" return self.value > other.value - def __ge__(self, other): + def __ge__(self, other: "CheckThreshold") -> bool: + """Threshold is greater or equal than given one.""" return self.value >= other.value - def check(self, value: float, lower: bool = False): + def check(self, value: float, lower: bool = False) -> bool: + """Check threshold value as upper or lower boundary for given value.""" if lower: return value < self.value - else: - return value > self.value + + return value > self.value @staticmethod - def threshold_type(arg: str): + def threshold_type(arg: str) -> Dict[str, "CheckThreshold"]: + """Convert string argument(s) to threshold dict.""" thresholds = {} try: thresholds[None] = CheckThreshold(float(arg)) - except: + except ValueError: for t in arg.split(","): m = re.match("([a-z_0-9]+):([0-9.]+)", t) if m: thresholds[m.group(1)] = CheckThreshold(float(m.group(2))) else: - raise argparse.ArgumentTypeError("invalid threshold format: {}".format(t)) + raise argparse.ArgumentTypeError(f"Invalid threshold format: {t}") # noqa: B904 return thresholds class CheckPVE: + """Check command for Proxmox VE.""" + VERSION = "1.2.2" API_URL = "https://{hostname}:{port}/api2/json/{command}" UNIT_SCALE = { @@ -101,7 +141,8 @@ class CheckPVE: "B": 1, } - def check_output(self): + def check_output(self) -> None: + """Print check command output with perfdata and return code.""" message = self.check_message if self.perfdata: message += self.get_perfdata() @@ -109,19 +150,20 @@ def check_output(self): self.output(self.check_result, message) @staticmethod - def output(rc, message): + def output(rc: CheckState, message: str) -> None: + """Print message to stdout and exit with given return code.""" prefix = rc.name - message = "{} - {}".format(prefix, message) - - print(message) + print(f"{prefix} - {message}") sys.exit(rc.value) - def get_url(self, command): + def get_url(self, command: str) -> str: + """Get API url for specific command.""" return self.API_URL.format( hostname=self.options.api_endpoint, command=command, port=self.options.api_port ) - def request(self, url, method="get", **kwargs): + def request(self, url: str, method: str = "get", **kwargs: Dict) -> Union[Dict, None]: + """Execute request against Proxmox VE API and return json data.""" response = None try: if method == "post": @@ -138,9 +180,10 @@ def request(self, url, method="get", **kwargs): cookies=self.__cookies, headers=self.__headers, params=kwargs.get("params", None), + timeout=CHECK_API_TIMEOUT, ) else: - self.output(CheckState.CRITICAL, "Unsupport request method: {}".format(method)) + self.output(CheckState.CRITICAL, f"Unsupport request method: {method}") except requests.exceptions.ConnectTimeout: self.output(CheckState.UNKNOWN, "Could not connect to PVE API: Connection timeout") except requests.exceptions.SSLError: @@ -154,29 +197,30 @@ def request(self, url, method="get", **kwargs): if response.ok: return response.json()["data"] + + message = "Could not fetch data from API: " + if response.status_code == 401: + message += "Could not connection to PVE API: invalid username or password" + elif response.status_code == 403: + message += ( + "Access denied. Please check if API user has sufficient permissions / " + "the correct role has been assigned." + ) else: - message = "Could not fetch data from API: " - - if response.status_code == 401: - message += "Could not connection to PVE API: invalid username or password" - elif response.status_code == 403: - message += ( - "Access denied. Please check if API user has sufficient permissions / the role has been " - "assigned." - ) - else: - message += "HTTP error code was {}".format(response.status_code) + message += f"HTTP error code was {response.status_code}" - self.output(CheckState.UNKNOWN, message) + self.output(CheckState.UNKNOWN, message) - def get_ticket(self): + def get_ticket(self) -> str: + """Perform login and fetch ticket for further API calls.""" url = self.get_url("access/ticket") data = {"username": self.options.api_user, "password": self.options.api_password} result = self.request(url, "post", data=data) return result["ticket"] - def check_api_value(self, url, message, **kwargs): + def check_api_value(self, url: StopIteration, message: str, **kwargs: Dict) -> None: + """Perform simple threshold based check command.""" result = self.request(url) used = None @@ -197,15 +241,16 @@ def check_api_value(self, url, message, **kwargs): self.add_perfdata(kwargs.get("perfkey", "usage"), used_percent) if self.options.values_mb: - message += " {} {}".format(used, self.options.unit) + message += f" {used} {self.options.unit}" value = used else: - message += " {} {}".format(used_percent, "%") + message += f" {used_percent} %" value = used_percent self.check_thresholds(value, message) - def check_vm_status(self, idx, **kwargs): + def check_vm_status(self, idx: Union[str, int], **kwargs: str) -> None: + """Check status of virtual machine by vmid or name.""" url = self.get_url( "cluster/resources", ) @@ -223,22 +268,21 @@ def check_vm_status(self, idx, **kwargs): vm_type = "LXC" if vm["status"] != expected_state: - self.check_message = "{} '{}' is {} (expected: {})".format( - vm_type, vm["name"], vm["status"], expected_state + self.check_message = ( + f"{vm_type} '{vm['name']}' is {vm['status']} (expected: {expected_state})" ) if not self.options.ignore_vm_status: self.check_result = CheckState.CRITICAL else: if self.options.node and self.options.node != vm["node"]: self.check_message = ( - "{} '{}' is {}, but located on node '{}' instead of '{}'".format( - vm_type, vm["name"], expected_state, vm["node"], self.options.node - ) + f"{vm_type} '{vm['name']}' is {expected_state}, " + f"but located on node '{vm['node']}' instead of '{self.options.node}'" ) self.check_result = CheckState.WARNING else: - self.check_message = "{} '{}' is {} on node '{}'".format( - vm_type, vm["name"], expected_state, vm["node"] + self.check_message = ( + f"{vm_type} '{vm['name']}' is {expected_state} on node '{vm['node']}'" ) if vm["status"] == "running" and not only_status: @@ -275,11 +319,12 @@ def check_vm_status(self, idx, **kwargs): break if not found: - self.check_message = "VM or LXC '{}' not found".format(idx) + self.check_message = f"VM or LXC '{idx}' not found" self.check_result = CheckState.WARNING - def check_disks(self): - url = self.get_url("nodes/{}/disks".format(self.options.node)) + def check_disks(self) -> None: + """Check disk health on specific Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/disks") failed = [] unknown = [] @@ -299,31 +344,26 @@ def check_disks(self): failed.append({"serial": disk["serial"], "device": disk["devpath"]}) if disk["wearout"] != "N/A": - self.add_perfdata("wearout_{}".format(name), disk["wearout"]) + self.add_perfdata(f"wearout_{name}", disk["wearout"]) if failed: - self.check_message = "{} of {} disks failed the health test:\n".format( - len(failed), len(disks) - ) + self.check_message = f"{len(failed)} of {len(disks)} disks failed the health test:\n" for disk in failed: - self.check_message += "- {} with serial '{}'\n".format( - disk["device"], disk["serial"] - ) + self.check_message += f"- {disk['device']} with serial '{disk['serial']}'\n" if unknown: - self.check_message += "{} of {} disks have unknown health status:\n".format( - len(unknown), len(disks) + self.check_message += ( + f"{len(unknown)} of {len(disks)} disks have unknown health status:\n" ) for disk in unknown: - self.check_message += "- {} with serial '{}'\n".format( - disk["device"], disk["serial"] - ) + self.check_message += f"- {disk['device']} with serial '{disk['serial']}'\n" if not failed and not unknown: self.check_message = "All disks are healthy" - def check_replication(self): - url = self.get_url("nodes/{}/replication".format(self.options.node)) + def check_replication(self) -> None: + """Check replication status for either all or one specific vm / container.""" + url = self.get_url(f"nodes/{self.options.node}/replication") if self.options.vmid: data = self.request(url, params={"guest": self.options.vmid}) @@ -342,7 +382,7 @@ def check_replication(self): performance_data.append({"id": job["id"], "duration": job["duration"]}) if len(failed_jobs) > 0: - message = "Failed replication jobs on {}: ".format(self.options.node) + message = f"Failed replication jobs on {self.options.node}: " for job in failed_jobs: message = ( message @@ -353,15 +393,16 @@ def check_replication(self): self.check_message = message self.check_result = CheckState.WARNING else: - self.check_message = "No failed replication jobs on {}".format(self.options.node) + self.check_message = f"No failed replication jobs on {self.options.node}" self.check_result = CheckState.OK if len(performance_data) > 0: for metric in performance_data: self.add_perfdata("duration_" + metric["id"], metric["duration"], unit="s") - def check_services(self): - url = self.get_url("nodes/{}/services".format(self.options.node)) + def check_services(self) -> None: + """Check state of core services on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/services") data = self.request(url) failed = {} @@ -375,14 +416,16 @@ def check_services(self): if failed: self.check_result = CheckState.CRITICAL - message = "{} services are not running:\n\n".format(len(failed)) - message += "\n".join(["- {} ({}) is not running".format(failed[i], i) for i in failed]) + message = f"{len(failed)} services are not running:\n\n" + for name, description in failed.items(): + message += f"- {description} ({name}) is not running\n" self.check_message = message else: self.check_message = "All services are running" - def check_subscription(self): - url = self.get_url("nodes/{}/subscription".format(self.options.node)) + def check_subscription(self) -> None: + """Check subscription status on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/subscription") data = self.request(url) if data["status"] == "NotFound": @@ -399,11 +442,9 @@ def check_subscription(self): date_today = datetime.today() delta = (date_expire - date_today).days - message = "{} is valid until {}".format( - subscription_product_name, subscription_due_date - ) - message_warning_critical = "{} will expire in {} days ({})".format( - subscription_product_name, delta, subscription_due_date + message = f"{subscription_product_name} is valid until {subscription_due_date}" + message_warning_critical = ( + "{subscription_product_name} will expire in {delta} days ({subscription_due_date})" ) self.check_thresholds( @@ -414,8 +455,9 @@ def check_subscription(self): lowerValue=True, ) - def check_updates(self): - url = self.get_url("nodes/{}/apt/update".format(self.options.node)) + def check_updates(self) -> None: + """Check for package updates on Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/apt/update") count = len(self.request(url)) if count: @@ -427,7 +469,8 @@ def check_updates(self): else: self.check_message = "System up to date" - def check_cluster_status(self): + def check_cluster_status(self) -> None: + """Check if cluster is operational.""" url = self.get_url("cluster/status") data = self.request(url) @@ -450,11 +493,9 @@ def check_cluster_status(self): if node_count > nodes_online_count: diff = node_count - nodes_online_count self.check_result = CheckState.WARNING - self.check_message = "Cluster '{}' is healthy, but {} node(s) offline'".format( - cluster, diff - ) + self.check_message = f"Cluster '{cluster}' is healthy, but {diff} node(s) offline'" else: - self.check_message = "Cluster '{}' is healthy'".format(cluster) + self.check_message = f"Cluster '{cluster}' is healthy'" self.add_perfdata("nodes_total", node_count, unit="") self.add_perfdata("nodes_online", nodes_online_count, unit="") @@ -462,8 +503,9 @@ def check_cluster_status(self): self.check_result = CheckState.CRITICAL self.check_message = "Cluster is unhealthy - no quorum" - def check_zfs_fragmentation(self, name=None): - url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) + def check_zfs_fragmentation(self, name: Optional[str] = None) -> None: + """Check all or one specific ZFS pool for fragmentation.""" + url = self.get_url(f"nodes/{self.options.node}/disks/zfs") data = self.request(url) warnings = [] @@ -474,10 +516,10 @@ def check_zfs_fragmentation(self, name=None): if (name is not None and name == pool["name"]) or name is None: key = "fragmentation" if name is None: - key += "_{}".format(pool["name"]) + key += f"_{pool['name']}" self.add_perfdata(key, pool["frag"]) - threshold_name = "fragmentation_{}".format(pool["name"]) + threshold_name = f"fragmentation_{name}" threshold_warning = self.threshold_warning(threshold_name) threshold_critical = self.threshold_critical(threshold_name) @@ -492,7 +534,7 @@ def check_zfs_fragmentation(self, name=None): if not found: self.check_result = CheckState.UNKNOWN - self.check_message = "Could not fetch fragmentation of ZFS pool '{}'".format(name) + self.check_message = f"Could not fetch fragmentation of ZFS pool '{name}'" else: if warnings or critical: value = None @@ -507,36 +549,31 @@ def check_zfs_fragmentation(self, name=None): if name is not None: self.check_message = ( - "Fragmentation of ZFS pool '{}' is above thresholds: {} %".format( - name, value - ) + f"Fragmentation of ZFS pool '{name}' is above thresholds: {value} %" ) else: - message = "{} of {} ZFS pools are above fragmentation thresholds:\n\n".format( - len(warnings) + len(critical), len(data) + pool_above = len(warnings) + len(critical) + message = ( + f"{pool_above} of {len(data)} ZFS pools are above fragmentation " + "thresholds:\n\n" ) message += "\n".join( - [ - "- {} ({} %) is CRITICAL\n".format(pool["name"], pool["frag"]) - for pool in critical - ] + [f"- {pool['name']} ({pool['frag']} %) is CRITICAL\n" for pool in critical] ) message += "\n".join( - [ - "- {} ({} %) is WARNING\n".format(pool["name"], pool["frag"]) - for pool in warnings - ] + [f"- {pool['name']} ({pool['frag']} %) is WARNING\n" for pool in warnings] ) self.check_message = message else: self.check_result = CheckState.OK if name is not None: - self.check_message = "Fragmentation of ZFS pool '{}' is OK".format(name) + self.check_message = f"Fragmentation of ZFS pool '{name}' is OK" else: self.check_message = "Fragmentation of all ZFS pools is OK" - def check_zfs_health(self, name=None): - url = self.get_url("nodes/{}/disks/zfs".format(self.options.node)) + def check_zfs_health(self, name: Optional[str] = None) -> None: + """Check all or one specific ZFS pool for health.""" + url = self.get_url(f"nodes/{self.options.node}/disks/zfs") data = self.request(url) unhealthy = [] @@ -550,26 +587,24 @@ def check_zfs_health(self, name=None): if not found: self.check_result = CheckState.UNKNOWN - self.check_message = "Could not fetch health of ZFS pool '{}'".format(name) + self.check_message = f"Could not fetch health of ZFS pool '{name}'" else: if unhealthy: self.check_result = CheckState.CRITICAL - message = "{} ZFS pools are not healthy:\n\n".format(len(unhealthy)) + message = f"{len(unhealthy)} ZFS pools are not healthy:\n\n" message += "\n".join( - [ - "- {} ({}) is not healthy".format(pool["name"], pool["health"]) - for pool in unhealthy - ] + [f"- {pool['name']} ({pool['health']}) is not healthy" for pool in unhealthy] ) self.check_message = message else: self.check_result = CheckState.OK if name is not None: - self.check_message = "ZFS pool '{}' is healthy".format(name) + self.check_message = f"ZFS pool '{name}' is healthy" else: self.check_message = "All ZFS pools are healthy" - def check_ceph_health(self): + def check_ceph_health(self) -> None: + """Check health of CEPH cluster.""" url = self.get_url("cluster/ceph/status") data = self.request(url) ceph_health = data.get("health", {}) @@ -595,22 +630,21 @@ def check_ceph_health(self): self.check_result = CheckState.UNKNOWN self.check_message = "Ceph Cluster is in unknown state" - def check_storage(self, name): - # check if storage exists - url = self.get_url("nodes/{}/storage".format(self.options.node)) + def check_storage(self, name: str) -> None: + """Check if storage exists and return usage.""" + url = self.get_url(f"nodes/{self.options.node}/storage") data = self.request(url) if not any(s["storage"] == name for s in data): self.check_result = CheckState.CRITICAL - self.check_message = "Storage '{}' doesn't exist on node '{}'".format( - name, self.options.node - ) + self.check_message = f"Storage '{name}' doesn't exist on node '{self.options.node}'" return - url = self.get_url("nodes/{}/storage/{}/status".format(self.options.node, name)) - self.check_api_value(url, "Usage of storage '{}' is".format(name)) + url = self.get_url(f"nodes/{self.options.node}/storage/{name}/status") + self.check_api_value(url, f"Usage of storage '{name}' is") - def check_version(self): + def check_version(self) -> None: + """Check PVE version.""" url = self.get_url("version") data = self.request(url) if not data["version"]: @@ -621,40 +655,44 @@ def check_version(self): ): self.check_result = CheckState.CRITICAL self.check_message = ( - "Current pve version '{}' ({}) is lower than the min. required version '{}'".format( - data["version"], data["repoid"], self.options.min_version - ) + f"Current PVE version '{data['version']}' " + f"({data['repoid']}) is lower than the min. " + f"required version '{self.options.min_version}'" ) else: - self.check_message = "Your pve instance version '{}' ({}) is up to date".format( - data["version"], data["repoid"] + self.check_message = ( + f"Your PVE instance version '{data['version']}' ({data['repoid']}) is up to date" ) - def check_vzdump_backup(self, name=None): + def check_vzdump_backup(self, name: Optional[str] = None) -> None: + """Check for failed vzdump backup jobs.""" tasks_url = self.get_url("cluster/tasks") tasks = self.request(tasks_url) tasks = [t for t in tasks if t["type"] == "vzdump"] + # Filter by node id, if one is provided if self.options.node is not None: tasks = [t for t in tasks if t["node"] == self.options.node] + # Filter by timestamp, if provided delta = self.threshold_critical("delta") if delta is not None: - now = datetime.utcnow().timestamp() + now = datetime.now(datetime.UTC).timestamp() + tasks = [t for t in tasks if not delta.check(now - t["starttime"])] + # absent status = job still running tasks = [t for t in tasks if "status" in t] failed = len([t for t in tasks if t["status"] != "OK"]) success = len(tasks) - failed - self.check_message = "{} backup tasks successful, {} backup tasks failed".format( - success, failed - ) + self.check_message = f"{success} backup tasks successful, {failed} backup tasks failed" + if failed > 0: self.check_result = CheckState.CRITICAL else: self.check_result = CheckState.OK if delta is not None: - self.check_message += " within the last {}s".format(delta.value) + self.check_message += f" within the last {delta.value}s" nbu_url = self.get_url("cluster/backup-info/not-backed-up") not_backed_up = self.request(nbu_url) @@ -663,33 +701,43 @@ def check_vzdump_backup(self, name=None): if self.check_result not in [CheckState.CRITICAL, CheckState.UNKNOWN]: self.check_result = CheckState.WARNING self.check_message += ( - "\nThere are guests not covered by any backup schedule: {}".format(guest_ids) + f"\nThere are guests not covered by any backup schedule: {guest_ids}" ) - def check_memory(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_memory(self) -> None: + """Check memory usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "Memory usage is", key="memory") - def check_swap(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_swap(self) -> None: + """Check swap usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "Swap usage is", key="swap") - def check_cpu(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_cpu(self) -> None: + """Check cpu usage of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "CPU usage is", key="cpu") - def check_io_wait(self): - url = self.get_url("nodes/{}/status".format(self.options.node)) + def check_io_wait(self) -> None: + """Check io wait of Proxmox VE node.""" + url = self.get_url(f"nodes/{self.options.node}/status") self.check_api_value(url, "IO wait is", key="wait", perfkey="wait") - def check_thresholds(self, value, message, **kwargs): + def check_thresholds( + self, + values: Union[Dict[str, Union[int, float]], Union[int, float]], + message: str, + **kwargs: Dict, + ) -> None: + """Check numeric value against threshold for given metric name.""" is_warning = False is_critical = False - if not isinstance(value, dict): - value = {None: value} + if not isinstance(values, dict): + values = {None: values} - for metric, value in value.items(): + for metric, value in values.items(): value_warning = self.threshold_warning(metric) if value_warning is not None: is_warning = is_warning or value_warning.check( @@ -711,23 +759,29 @@ def check_thresholds(self, value, message, **kwargs): else: self.check_message = message - def scale_value(self, value): + def scale_value(self, value: Union[int, float]) -> float: + """Scale value according to unit.""" if self.options.unit in self.UNIT_SCALE: return value / self.UNIT_SCALE[self.options.unit] - else: - assert "wrong unit" - def threshold_warning(self, name: str): + raise ValueError("wrong unit") + + def threshold_warning(self, name: str) -> CheckThreshold: + """Get warning threshold for metric name (empty if none).""" return self.options.threshold_warning.get( name, self.options.threshold_warning.get(None, None) ) - def threshold_critical(self, name: str): + def threshold_critical(self, name: str) -> CheckThreshold: + """Get critical threshold for metric name (empty if none).""" return self.options.threshold_critical.get( name, self.options.threshold_critical.get(None, None) ) - def get_value(self, value, total=None): + def get_value( + self, value: Union[int, float], total: Optional[Union[int, float]] = None + ) -> float: + """Get value scaled or as percentage.""" value = float(value) if total: @@ -737,10 +791,11 @@ def get_value(self, value, total=None): return round(value, 2) - def add_perfdata(self, name, value, **kwargs): + def add_perfdata(self, name: str, value: Union[int, float], **kwargs: Dict) -> None: + """Add metric to perfdata output.""" unit = kwargs.get("unit", "%") - perfdata = "{}={}{}".format(name, value, unit) + perfdata = f"{name}={value}{unit}" threshold_warning = self.threshold_warning(name) threshold_critical = self.threshold_critical(name) @@ -753,21 +808,23 @@ def add_perfdata(self, name, value, **kwargs): if threshold_critical: perfdata += str(threshold_critical.value) - perfdata += ";{}".format(kwargs.get("min", 0)) - perfdata += ";{}".format(kwargs.get("max", "")) + perfdata += ";" + str(kwargs.get("min", 0)) + perfdata += ";" + str(kwargs.get("max", "")) self.perfdata.append(perfdata) - def get_perfdata(self): + def get_perfdata(self) -> str: + """Get perfdata string.""" perfdata = "" - if len(self.perfdata): + if self.perfdata: perfdata = "|" perfdata += " ".join(self.perfdata) return perfdata - def check(self): + def check(self) -> None: + """Execute the real check command.""" self.check_result = CheckState.OK if self.options.mode == "cluster": @@ -817,12 +874,13 @@ def check(self): elif self.options.mode == "backup": self.check_vzdump_backup(self.options.name) else: - message = "Check mode '{}' not known".format(self.options.mode) + message = f"Check mode '{self.options.mode}' not known" self.output(CheckState.UNKNOWN, message) self.check_output() - def parse_args(self): + def parse_args(self) -> None: + """Parse CLI arguments.""" p = argparse.ArgumentParser(description="Check command for PVE hosts via API") api_opts = p.add_argument_group("API Options") @@ -953,14 +1011,20 @@ def parse_args(self): dest="threshold_critical", type=CheckThreshold.threshold_type, default={}, - help="Critical threshold for check value. Mutiple thresholds with name:value,name:value", + help=( + "Critical threshold for check value. " + "Mutiple thresholds with name:value,name:value" + ), ) check_opts.add_argument( "-M", dest="values_mb", action="store_true", default=False, - help="Values are shown in the unit which is set with --unit (if available). Thresholds are also treated in this unit", + help=( + "Values are shown in the unit which is set with --unit (if available). " + "Thresholds are also treated in this unit" + ), ) check_opts.add_argument( "-V", @@ -989,40 +1053,22 @@ def parse_args(self): "backup", ]: p.print_usage() - message = "{}: error: --mode {} requires node name (--node)".format( - p.prog, options.mode - ) + message = f"{p.prog}: error: --mode {options.mode} requires node name (--node)" self.output(CheckState.UNKNOWN, message) if not options.vmid and not options.name and options.mode in ("vm", "vm_status"): p.print_usage() - message = "{}: error: --mode {} requires either vm name (--name) or id (--vmid)".format( - p.prog, options.mode + message = ( + f"{p.prog}: error: --mode {options.mode} requires either " + "vm name (--name) or id (--vmid)", ) self.output(CheckState.UNKNOWN, message) if not options.name and options.mode == "storage": p.print_usage() - message = "{}: error: --mode {} requires storage name (--name)".format( - p.prog, options.mode - ) + message = f"{p.prog}: error: --mode {options.mode} requires storage name (--name)" self.output(CheckState.UNKNOWN, message) - def compare_thresholds(threshold_warning, threshold_critical, comparator): - ok = True - keys = set(list(threshold_warning.keys()) + list(threshold_critical.keys())) - for key in keys: - if (key in threshold_warning and key in threshold_critical) or ( - None in threshold_warning and None in threshold_critical - ): - ok = ok and comparator(threshold_warning[key], threshold_critical[key]) - elif key in threshold_warning and None in threshold_critical: - ok = ok and comparator(threshold_warning[key], threshold_critical[None]) - elif key in threshold_critical and None in threshold_warning: - ok = ok and comparator(threshold_warning[None], threshold_critical[key]) - - return ok - if options.threshold_warning and options.threshold_critical: if options.mode != "subscription" and not compare_thresholds( options.threshold_warning, options.threshold_critical, lambda w, c: w <= c @@ -1035,7 +1081,7 @@ def compare_thresholds(threshold_warning, threshold_critical, comparator): self.options = options - def __init__(self): + def __init__(self) -> None: self.options = {} self.ticket = None self.perfdata = [] @@ -1049,16 +1095,13 @@ def __init__(self): if self.options.api_insecure: # disable urllib3 warning about insecure requests - requests.packages.urllib3.disable_warnings( - requests.packages.urllib3.exceptions.InsecureRequestWarning - ) + requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) if self.options.api_password is not None: self.__cookies["PVEAuthCookie"] = self.get_ticket() elif self.options.api_token is not None: - self.__headers["Authorization"] = "PVEAPIToken={}!{}".format( - self.options.api_user, self.options.api_token - ) + token = f"{self.options.api_user}!{self.options.api_token}" + self.__headers["Authorization"] = f"PVEAPIToken={token}" pve = CheckPVE()