From b2314cfad3550be46f0b5a76dc891cce0416dc80 Mon Sep 17 00:00:00 2001 From: tombo Date: Sun, 30 Jun 2024 16:37:18 +0200 Subject: [PATCH] support for labels --- prometheus_es_exporter/__init__.py | 40 ++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/prometheus_es_exporter/__init__.py b/prometheus_es_exporter/__init__.py index 8d5918d..e5071b8 100644 --- a/prometheus_es_exporter/__init__.py +++ b/prometheus_es_exporter/__init__.py @@ -4,7 +4,9 @@ import configparser import glob import json +import warnings import logging +import urllib3 import os import sched import time @@ -421,7 +423,7 @@ def conv(value): return conv -def kubeOperator(config_dir, es_cluster): +def kubeOperator(config_dir, es_cluster, opt_labels, queries_only): kubeconf_loaded = False try: kubernetes.config.load_incluster_config() @@ -446,7 +448,12 @@ def kubeOperator(config_dir, es_cluster): watch = kubernetes.watch.Watch() - def esqueries_worker_run(watch_api, config_dir, customObjectsApi, group, vers, customResource): + def filter_labels(cr_labels, opt_labels): + labels_list = [f"{key}={value}" for key, value in cr_labels.items()] + return any(label in labels_list for label in opt_labels) + + + def esqueries_worker_run(watch_api, config_dir, customObjectsApi, group, vers, customResource, opt_labels): log.info("WORKER esqueries_worker_run RUNNING") def getSection(name): @@ -480,6 +487,10 @@ def updateConfigs(item): item = ev["object"] namespace = item["metadata"]["namespace"] name = item["metadata"]["name"] + if opt_labels and item['metadata']['labels']: + if not filter_labels(item['metadata']['labels'], opt_labels): + print("Skip resource, not matching labels", item['metadata']['labels']) + continue if ev['type'] == 'ADDED' or ev['type'] == 'MODIFIED': updateConfigs(item) items_in_ns.append(getConfigFile(name, namespace, False)) @@ -493,7 +504,7 @@ def updateConfigs(item): except Exception as e: log.info(str(e)) - def estemplates_worker_run(watch_api, config_dir, customObjectsApi, group, vers, customResource): + def estemplates_worker_run(watch_api, config_dir, customObjectsApi, group, vers, customResource, opt_labels): TEMPLATES_PROVISION_STATUS = {} def updateStatus(item, merge_patch): @@ -602,6 +613,12 @@ def rolloverAlias(item): for ev in watch.stream(customObjectsApi.list_cluster_custom_object, group, vers, customResource, timeout_seconds = 0, _request_timeout = (30, 24*3600)): item = ev['object'] item_resource_version = item['metadata']['resourceVersion'] + + if opt_labels and item['metadata']['labels']: + if not filter_labels(item['metadata']['labels'], opt_labels): + print("SKIPPING") + continue + templateName = item['spec']['templateName'] name = item['metadata']['name'] log.debug(f"Template event {ev['type']}, name: {name}, resource version: {item_resource_version}") @@ -668,13 +685,14 @@ def rolloverAlias(item): resources = [ {"group": "braedon.github.io", "vers": "v1alpha1", "customResource": "esqueries", "fn": esqueries_worker_run}, - {"group": "braedon.github.io", "vers": "v1alpha1", "customResource": "estemplates", "fn": estemplates_worker_run}, ] + if not queries_only: + resources.append({"group": "braedon.github.io", "vers": "v1alpha1", "customResource": "estemplates", "fn": estemplates_worker_run}) thrds = [] for resource in resources: - - worker = Thread(target=resource['fn'],args=(watch, config_dir, customObjectsApi, resource["group"], resource["vers"], resource["customResource"]) ) + worker = Thread(target=resource['fn'],args=(watch, config_dir, customObjectsApi, resource["group"], + resource["vers"], resource["customResource"], opt_labels)) worker.daemon = True worker.start() thrds.append(worker) @@ -832,10 +850,12 @@ def load(self): help='Turn on kubernetes operator mode.') @click.option('--single-metric', default=True, is_flag=True, help='Use just one metrics with labels with operator mode') -@click.option('--filter-customer-resource-labels', +@click.option('--filter-cr-labels', callback=indices_stats_indices_parser, help='Consider only reources which match label. ' 'Labels should be separated by commas e.g. env=dev,app=base.') +@click.option('--operator-queries-only', default=False, is_flag=True, + help='Only quries not templates CR.') @click_config_file.configuration_option() def cli(**options): @@ -878,6 +898,10 @@ def cli(**options): log_handler.setFormatter(formatter) log_level = getattr(logging, options['log_level']) + + # Suppress only the InsecureRequestWarning + warnings.simplefilter('ignore', urllib3.exceptions.InsecureRequestWarning) + warnings.filterwarnings("ignore", message=".*verify_certs=False is insecure.*") logging.basicConfig( handlers=[log_handler], level=logging.DEBUG if options['verbose'] else log_level @@ -959,7 +983,7 @@ def cli(**options): operator = None if options['operator_mode']: - operator = Process(target=kubeOperator, args=(options['config_dir'], es_cluster,)) + operator = Process(target=kubeOperator, args=(options['config_dir'], es_cluster, options['filter_cr_labels'], options['operator_queries_only'])) operator.start() if scheduler: