Skip to content

Commit

Permalink
Merge pull request #61 from shinken-monitoring/clean_main_module
Browse files Browse the repository at this point in the history
Clean and simplification in main module
  • Loading branch information
gst committed May 8, 2015
2 parents 0880de2 + 4b5a272 commit 7f7e307
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 186 deletions.
243 changes: 59 additions & 184 deletions module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def get_instance(plugin):

#############################################################################

import errno
import select
import socket
import os
Expand Down Expand Up @@ -104,7 +105,6 @@ def __init__(self, modconf):
# We can be in a scheduler. If so, we keep a link to it to speed up regenerator phase
self.scheduler = None
self.plugins = []
self.use_threads = (getattr(modconf, 'use_threads', '0') == '1')
self.host = getattr(modconf, 'host', '127.0.0.1')
if self.host == '*':
self.host = '0.0.0.0'
Expand Down Expand Up @@ -149,7 +149,10 @@ def __init__(self, modconf):
# We need to have our regenerator now because it will need to load
# data from scheduler before main() if in scheduler of course
self.rg = LiveStatusRegenerator(self.service_authorization_strict, self.group_authorization_strict)
self.client_connections = {} # keys will be socket of client, values are LiveStatusClientThread instances

self.client_connections = {} # keys will be socket of client,
# values are LiveStatusClientThread instances

self.db = None
self.listeners = []
self._listening_thread = threading.Thread(target=self._listening_thread_run)
Expand Down Expand Up @@ -236,8 +239,6 @@ def main(self):
self.rg.register_cache(self.query_cache)

try:
#import cProfile
#cProfile.runctx('''self.do_main()''', globals(), locals(),'/tmp/livestatus.profile')
self.do_main()
except Exception, exp:
msg = Message(id=0, type='ICrash', data={
Expand Down Expand Up @@ -276,125 +277,15 @@ def do_main(self):
if self.db.max_logs_age > 0:
self.db.log_db_do_archive()

# We ill protect the operations on
# the non read+write with a lock and
# 2 int
self.global_lock = threading.RLock()
self.nb_readers = 0
self.nb_writers = 0

self.data_thread = None

# Check if some og the required directories exist
#if not os.path.exists(bottle.TEMPLATE_PATH[0]):
# logger.error('The view path do not exist at %s' % bottle.TEMPLATE_PATH)
# sys.exit(2)

self.load_plugins()

if self.use_threads:
# Launch the data thread"
logger.info("[Livestatus Broker] Starting Livestatus application")
self.data_thread = threading.Thread(None, self.manage_brok_thread, 'datathread')
self.data_thread.start()
self.lql_thread = threading.Thread(None, self.manage_lql_thread, 'lqlthread')
self.lql_thread.start()
self.data_thread.join()
self.lql_thread.join()
else:
self.manage_lql_thread()


# It's the thread function that will get broks
# and update data. Will lock the whole thing
# while updating
def manage_brok_thread(self):
logger.info("[Livestatus Broker] Data thread started")
while True:
l = self.to_q.get()
for b in l:
# Un-serialize the brok data
b.prepare()
# For updating, we cannot do it while
# answer queries, so wait for no readers
self.wait_for_no_readers()
try:
logger.debug("[Livestatus Broker] Got data lock, manage brok")
self.rg.manage_brok(b)
for mod in self.modules_manager.get_internal_instances():
try:
mod.manage_brok(b)
except Exception, exp:
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
logger.warning("[%s] The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
self.modules_manager.set_to_restart(mod)
except Exception, exp:
msg = Message(id=0, type='ICrash', data={
'name': self.get_name(),
'exception': exp,
'trace': traceback.format_exc()
})
self.from_q.put(msg)
# wait 2 sec so we know that the broker got our message, and die
time.sleep(2)
# No need to raise here, we are in a thread, exit!
os._exit(2)
#finally:
# We can remove us as a writer from now. It's NOT an atomic operation
# so we REALLY not need a lock here (yes, I try without and I got
# a not so accurate value there....)
self.global_lock.acquire()
self.nb_writers -= 1
self.global_lock.release()
self.main_thread_run()

# Here we will load all plugins (pages) under the webui/plugins
# directory. Each one can have a page, views and htdocs dir that we must
# route correctly
def load_plugins(self):
pass

# It will say if we can launch a page rendering or not.
# We can only if there is no writer running from now
def wait_for_no_writers(self):
while True:
self.global_lock.acquire()
# We will be able to run
if self.nb_writers == 0:
# Ok, we can run, register us as readers
self.nb_readers += 1
self.global_lock.release()
break
# Oups, a writer is in progress. We must wait a bit
self.global_lock.release()
# Before checking again, we should wait a bit
# like 1ms
time.sleep(0.001)

# It will say if we can launch a brok management or not
# We can only if there is no readers running from now
def wait_for_no_readers(self):
start = time.time()
while True:
self.global_lock.acquire()
# We will be able to run
if self.nb_readers == 0:
# Ok, we can run, register us as writers
self.nb_writers += 1
self.global_lock.release()
break
# Ok, we cannot run now, wait a bit
self.global_lock.release()
# Before checking again, we should wait a bit
# like 1ms
time.sleep(0.001)
# We should warn if we cannot update broks
# for more than 30s because it can be not good
if time.time() - start > 30:
logger.warning("[Livestatus Broker] WARNING: we are in lock/read since more than 30s!")
start = time.time()

def manage_brok(self, brok):
"""We use this method mostly for the unit tests"""
brok.prepare()
Expand All @@ -418,6 +309,7 @@ def do_stop(self):
for client in self.client_connections.values():
assert isinstance(client, LiveStatusClientThread)
client.join()
self.client_connections.clear()
if self._listening_thread:
self._listening_thread.join()
# inputs must be closed after listening_thread
Expand All @@ -428,7 +320,6 @@ def do_stop(self):
except Exception as err:
logger.warning('Error on db close: %s' % err)


def create_listeners(self):
backlog = 5
if self.port:
Expand Down Expand Up @@ -459,43 +350,34 @@ def create_listeners(self):
self.listeners.append(sock)
logger.info("[Livestatus Broker] listening on unix socket: %s" % str(self.socket))


def _listening_thread_run(self):
while not self.interrupted:
# Check for pending livestatus requests
# Check for pending livestatus new connection..
inputready, _, exceptready = select.select(self.listeners, [], [], 1)

if len(exceptready) > 0:
pass

if len(inputready) > 0:
for s in inputready:
# We will identify sockets by their filehandle number
# during the rest of this loop
#socketid = s.fileno()
if s in self.listeners:
# handle the server socket
sock, address = s.accept()
if isinstance(address, tuple):
client_ip, _ = address
if self.allowed_hosts and client_ip not in self.allowed_hosts:
logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip))
full_safe_close(sock)
continue

new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self)
new_client.start()
self.livestatus.count_event('connections')

# end for s in inputready:
pass # TODO ?

for s in inputready:
# handle the server socket
sock, address = s.accept()
if isinstance(address, tuple):
client_ip, _ = address
if self.allowed_hosts and client_ip not in self.allowed_hosts:
logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip))
full_safe_close(sock)
continue

new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self)
new_client.start()
self.livestatus.count_event('connections')
# end for s in inputready:

# At the end of this loop we probably will discard connections
kick_connections = []
for sock, client in self.client_connections.items():
assert isinstance(client, LiveStatusClientThread)
if client.is_alive():
pass
else:
if not client.is_alive():
kick_connections.append(sock)

for sock in kick_connections:
Expand All @@ -504,56 +386,49 @@ def _listening_thread_run(self):
# It's the thread function that will get broks
# and update data. Will lock the whole thing
# while updating
def manage_lql_thread(self):
def main_thread_run(self):
logger.info("[Livestatus Broker] Livestatus query thread started")
self.db.open() # make sure to open the db in this thread..
self.db.open() # make sure to open the db in this thread..
# This is the main object of this broker where the action takes place
self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q)
self.create_listeners()
self._listening_thread.start()

db_commit_next_time = time.time()

while not self.interrupted:
if self.use_threads:
self.wait_for_no_writers()
self.livestatus.counters.calc_rate()
else:
try:
l = self.to_q.get(True, 1)
for b in l:
# Un-serialize the brok data
b.prepare()
self.rg.manage_brok(b)
for mod in self.modules_manager.get_internal_instances():
try:
mod.manage_brok(b)
except Exception, exp:
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
logger.warning("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
self.modules_manager.set_to_restart(mod)
except Queue.Empty:
self.livestatus.counters.calc_rate()
except IOError, e:
if hasattr(os, 'errno') and e.errno != os.errno.EINTR:
raise
except Exception, exp:
logger.debug("[Livestatus Broker] %s" % str(exp.__dict__))
logger.error("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp)))
logger.debug("[%s] Exception type: %s" % (self.name, type(exp)))
logger.debug("Back trace of this kill: %s" % (traceback.format_exc()))
now = time.time()

self.livestatus.counters.calc_rate()

if db_commit_next_time < now:
db_commit_next_time = now + 3 # only commit every ~3 secs
self.db.commit_and_rotate_log_db()

try:
l = self.to_q.get(True, 1)
except IOError as err:
if err.errno != os.errno.EINTR:
raise
except Queue.Empty:
pass
else:
for b in l:
b.prepare() # Un-serialize the brok data
self.rg.manage_brok(b)
for mod in self.modules_manager.get_internal_instances():
try:
mod.manage_brok(b)
except Exception as err:
logger.exception(
"[%s] Warning: The mod %s raise an exception: %s,"
"I'm tagging it to restart later",
self.name, mod.get_name(), err)
self.modules_manager.set_to_restart(mod)

# Commit log broks to the database
self.db.commit_and_rotate_log_db()
# just to have eventually more broks accumulated
# in our input queue:
time.sleep(0.1)

# end: while not self.interrupted:
self.do_stop()

def write_protocol(self, request=None, response=None, sent=0):
if self.debug_queries:
if request is not None:
print "REQUEST>>>>>\n" + request + "\n\n"
if response is not None:
print "RESPONSE<<<<\n" + response + "\n"
print "RESPONSE SENT<<<<\n %s \n\n" % sent
2 changes: 1 addition & 1 deletion test/test_livestatus_allowedhosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def init_livestatus(self, conf):

# NOTE: function is blocking, so must be launched in a thread
#self.livestatus_broker.do_main()
self.lql_thread = threading.Thread(None, self.livestatus_broker.manage_lql_thread, 'lqlthread')
self.lql_thread = threading.Thread(None, self.livestatus_broker.main_thread_run, 'lqlthread')
self.lql_thread.start()
# wait for thread to init
time.sleep(3)
Expand Down
2 changes: 1 addition & 1 deletion test/test_wait_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def init_livestatus(self, conf):
self.sched.fill_initial_broks('Default-Broker')
self.update_broker()
# execute the livestatus by starting a dedicated thread to run the manage_lql_thread function:
self.lql_thread = threading.Thread(target=self.livestatus_broker.manage_lql_thread, name='lqlthread')
self.lql_thread = threading.Thread(target=self.livestatus_broker.main_thread_run, name='lqlthread')
self.lql_thread.start()
t0 = time.time()
# give some time for the thread to init and creates its listener socket(s) :
Expand Down

0 comments on commit 7f7e307

Please sign in to comment.