Skip to content

Commit

Permalink
feat: review threading and fix memory leak
Browse files Browse the repository at this point in the history
Signed-off-by: djerfy <[email protected]>
  • Loading branch information
djerfy committed Jan 11, 2024
1 parent f635595 commit 972a77e
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions src/zabbix-kubernetes-discovery.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3

import argparse, sys, os, yaml
import argparse, sys, os, yaml, queue
import logging, schedule, threading
from time import sleep
from kubernetes import config as kube_config
Expand Down Expand Up @@ -40,62 +40,68 @@
logging.debug(f"-> Zabbix timeout: {config['zabbix']['timeout']}")
logging.debug(f"-> Cluster name: {config['kubernetes']['name']}")

def mainSend(data):
def executeSender(data):
try:
logging.debug(data)
zabbix.send(data)
except Exception as e:
logging.debug(e)

def mainThread(func):
def executeJobs():
try:
func_thread = threading.Thread(target=func)
func_thread.start()
jobs = jobs_queue.get()
jobs()
jobs_queue.task_done()
except Exception as e:
logging.error(e)

if __name__ == "__main__":
logging.info("Application zabbix-kubernetes-discovery started")

jobs_queue = queue.Queue()

# cronjobs
if config['monitoring']['cronjobs']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseCronjobs(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseCronjobs(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseCronjobs(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseCronjobs(mode="item", config=config)))

# daemonsets
if config['monitoring']['daemonsets']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseDaemonsets(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseDaemonsets(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseDaemonsets(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseDaemonsets(mode="item", config=config)))

# deployments
if config['monitoring']['deployments']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseDeployments(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseDeployments(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseDeployments(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseDeployments(mode="item", config=config)))

# nodes
if config['monitoring']['nodes']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseNodes(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseNodes(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseNodes(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseNodes(mode="item", config=config)))

# statefulsets
if config['monitoring']['statefulsets']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseStatefulsets(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseStatefulsets(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseStatefulsets(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseStatefulsets(mode="item", config=config)))

# volumes
if config['monitoring']['volumes']['enabled']:
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseVolumes(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseVolumes(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseVolumes(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseVolumes(mode="item", config=config)))

# openebs
if config['monitoring']['openebs']['enabled']:
# cstorpoolclusters
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseOpenebsCstorpoolclusters(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseOpenebsCstorpoolclusters(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseOpenebsCstorpoolclusters(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseOpenebsCstorpoolclusters(mode="item", config=config)))
# cstorpoolinstances
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(mainThread, lambda: mainSend(baseOpenebsCstorpoolinstances(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(mainThread, lambda: mainSend(baseOpenebsCstorpoolinstances(mode="item", config=config)))
schedule.every(config['zabbix']['schedule']['discovery']).seconds.do(jobs_queue.put, executeSender(baseOpenebsCstorpoolinstances(mode="discovery", config=config)))
schedule.every(config['zabbix']['schedule']['items']).seconds.do(jobs_queue.put, executeSender(baseOpenebsCstorpoolinstances(mode="item", config=config)))

# thread
thread = threading.Thread(target=executeJobs)
thread.start()

# tasks
while True:
Expand Down

0 comments on commit 972a77e

Please sign in to comment.