diff --git a/module/module.py b/module/module.py index e4ec5bc..aafa539 100644 --- a/module/module.py +++ b/module/module.py @@ -47,6 +47,7 @@ def get_instance(plugin): ############################################################################# +import errno import select import socket import os @@ -104,7 +105,6 @@ def __init__(self, modconf): # We can be in a scheduler. If so, we keep a link to it to speed up regenerator phase self.scheduler = None self.plugins = [] - self.use_threads = (getattr(modconf, 'use_threads', '0') == '1') self.host = getattr(modconf, 'host', '127.0.0.1') if self.host == '*': self.host = '0.0.0.0' @@ -149,7 +149,10 @@ def __init__(self, modconf): # We need to have our regenerator now because it will need to load # data from scheduler before main() if in scheduler of course self.rg = LiveStatusRegenerator(self.service_authorization_strict, self.group_authorization_strict) - self.client_connections = {} # keys will be socket of client, values are LiveStatusClientThread instances + + self.client_connections = {} # keys will be socket of client, + # values are LiveStatusClientThread instances + self.db = None self.listeners = [] self._listening_thread = threading.Thread(target=self._listening_thread_run) @@ -236,8 +239,6 @@ def main(self): self.rg.register_cache(self.query_cache) try: - #import cProfile - #cProfile.runctx('''self.do_main()''', globals(), locals(),'/tmp/livestatus.profile') self.do_main() except Exception, exp: msg = Message(id=0, type='ICrash', data={ @@ -276,78 +277,8 @@ def do_main(self): if self.db.max_logs_age > 0: self.db.log_db_do_archive() - # We ill protect the operations on - # the non read+write with a lock and - # 2 int - self.global_lock = threading.RLock() - self.nb_readers = 0 - self.nb_writers = 0 - - self.data_thread = None - - # Check if some og the required directories exist - #if not os.path.exists(bottle.TEMPLATE_PATH[0]): - # logger.error('The view path do not exist at %s' % bottle.TEMPLATE_PATH) - # sys.exit(2) - self.load_plugins() - - if self.use_threads: - # Launch the data thread" - logger.info("[Livestatus Broker] Starting Livestatus application") - self.data_thread = threading.Thread(None, self.manage_brok_thread, 'datathread') - self.data_thread.start() - self.lql_thread = threading.Thread(None, self.manage_lql_thread, 'lqlthread') - self.lql_thread.start() - self.data_thread.join() - self.lql_thread.join() - else: - self.manage_lql_thread() - - - # It's the thread function that will get broks - # and update data. Will lock the whole thing - # while updating - def manage_brok_thread(self): - logger.info("[Livestatus Broker] Data thread started") - while True: - l = self.to_q.get() - for b in l: - # Un-serialize the brok data - b.prepare() - # For updating, we cannot do it while - # answer queries, so wait for no readers - self.wait_for_no_readers() - try: - logger.debug("[Livestatus Broker] Got data lock, manage brok") - self.rg.manage_brok(b) - for mod in self.modules_manager.get_internal_instances(): - try: - mod.manage_brok(b) - except Exception, exp: - logger.debug("[Livestatus Broker] %s" % str(exp.__dict__)) - logger.warning("[%s] The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp))) - logger.debug("[%s] Exception type: %s" % (self.name, type(exp))) - logger.debug("Back trace of this kill: %s" % (traceback.format_exc())) - self.modules_manager.set_to_restart(mod) - except Exception, exp: - msg = Message(id=0, type='ICrash', data={ - 'name': self.get_name(), - 'exception': exp, - 'trace': traceback.format_exc() - }) - self.from_q.put(msg) - # wait 2 sec so we know that the broker got our message, and die - time.sleep(2) - # No need to raise here, we are in a thread, exit! - os._exit(2) - #finally: - # We can remove us as a writer from now. It's NOT an atomic operation - # so we REALLY not need a lock here (yes, I try without and I got - # a not so accurate value there....) - self.global_lock.acquire() - self.nb_writers -= 1 - self.global_lock.release() + self.main_thread_run() # Here we will load all plugins (pages) under the webui/plugins # directory. Each one can have a page, views and htdocs dir that we must @@ -355,46 +286,6 @@ def manage_brok_thread(self): def load_plugins(self): pass - # It will say if we can launch a page rendering or not. - # We can only if there is no writer running from now - def wait_for_no_writers(self): - while True: - self.global_lock.acquire() - # We will be able to run - if self.nb_writers == 0: - # Ok, we can run, register us as readers - self.nb_readers += 1 - self.global_lock.release() - break - # Oups, a writer is in progress. We must wait a bit - self.global_lock.release() - # Before checking again, we should wait a bit - # like 1ms - time.sleep(0.001) - - # It will say if we can launch a brok management or not - # We can only if there is no readers running from now - def wait_for_no_readers(self): - start = time.time() - while True: - self.global_lock.acquire() - # We will be able to run - if self.nb_readers == 0: - # Ok, we can run, register us as writers - self.nb_writers += 1 - self.global_lock.release() - break - # Ok, we cannot run now, wait a bit - self.global_lock.release() - # Before checking again, we should wait a bit - # like 1ms - time.sleep(0.001) - # We should warn if we cannot update broks - # for more than 30s because it can be not good - if time.time() - start > 30: - logger.warning("[Livestatus Broker] WARNING: we are in lock/read since more than 30s!") - start = time.time() - def manage_brok(self, brok): """We use this method mostly for the unit tests""" brok.prepare() @@ -418,6 +309,7 @@ def do_stop(self): for client in self.client_connections.values(): assert isinstance(client, LiveStatusClientThread) client.join() + self.client_connections.clear() if self._listening_thread: self._listening_thread.join() # inputs must be closed after listening_thread @@ -428,7 +320,6 @@ def do_stop(self): except Exception as err: logger.warning('Error on db close: %s' % err) - def create_listeners(self): backlog = 5 if self.port: @@ -459,43 +350,34 @@ def create_listeners(self): self.listeners.append(sock) logger.info("[Livestatus Broker] listening on unix socket: %s" % str(self.socket)) - def _listening_thread_run(self): while not self.interrupted: - # Check for pending livestatus requests + # Check for pending livestatus new connection.. inputready, _, exceptready = select.select(self.listeners, [], [], 1) if len(exceptready) > 0: - pass - - if len(inputready) > 0: - for s in inputready: - # We will identify sockets by their filehandle number - # during the rest of this loop - #socketid = s.fileno() - if s in self.listeners: - # handle the server socket - sock, address = s.accept() - if isinstance(address, tuple): - client_ip, _ = address - if self.allowed_hosts and client_ip not in self.allowed_hosts: - logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip)) - full_safe_close(sock) - continue - - new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self) - new_client.start() - self.livestatus.count_event('connections') - - # end for s in inputready: + pass # TODO ? + + for s in inputready: + # handle the server socket + sock, address = s.accept() + if isinstance(address, tuple): + client_ip, _ = address + if self.allowed_hosts and client_ip not in self.allowed_hosts: + logger.warning("[Livestatus Broker] Connection attempt from illegal ip address %s" % str(client_ip)) + full_safe_close(sock) + continue + + new_client = self.client_connections[sock] = LiveStatusClientThread(sock, address, self) + new_client.start() + self.livestatus.count_event('connections') + # end for s in inputready: # At the end of this loop we probably will discard connections kick_connections = [] for sock, client in self.client_connections.items(): assert isinstance(client, LiveStatusClientThread) - if client.is_alive(): - pass - else: + if not client.is_alive(): kick_connections.append(sock) for sock in kick_connections: @@ -504,56 +386,49 @@ def _listening_thread_run(self): # It's the thread function that will get broks # and update data. Will lock the whole thing # while updating - def manage_lql_thread(self): + def main_thread_run(self): logger.info("[Livestatus Broker] Livestatus query thread started") - self.db.open() # make sure to open the db in this thread.. + self.db.open() # make sure to open the db in this thread.. # This is the main object of this broker where the action takes place self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q) self.create_listeners() self._listening_thread.start() + db_commit_next_time = time.time() + while not self.interrupted: - if self.use_threads: - self.wait_for_no_writers() - self.livestatus.counters.calc_rate() - else: - try: - l = self.to_q.get(True, 1) - for b in l: - # Un-serialize the brok data - b.prepare() - self.rg.manage_brok(b) - for mod in self.modules_manager.get_internal_instances(): - try: - mod.manage_brok(b) - except Exception, exp: - logger.debug("[Livestatus Broker] %s" % str(exp.__dict__)) - logger.warning("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp))) - logger.debug("[%s] Exception type: %s" % (self.name, type(exp))) - logger.debug("Back trace of this kill: %s" % (traceback.format_exc())) - self.modules_manager.set_to_restart(mod) - except Queue.Empty: - self.livestatus.counters.calc_rate() - except IOError, e: - if hasattr(os, 'errno') and e.errno != os.errno.EINTR: - raise - except Exception, exp: - logger.debug("[Livestatus Broker] %s" % str(exp.__dict__)) - logger.error("[%s] Warning: The mod %s raise an exception: %s, I'm tagging it to restart later" % (self.name, mod.get_name(), str(exp))) - logger.debug("[%s] Exception type: %s" % (self.name, type(exp))) - logger.debug("Back trace of this kill: %s" % (traceback.format_exc())) + now = time.time() + + self.livestatus.counters.calc_rate() + + if db_commit_next_time < now: + db_commit_next_time = now + 3 # only commit every ~3 secs + self.db.commit_and_rotate_log_db() + + try: + l = self.to_q.get(True, 1) + except IOError as err: + if err.errno != os.errno.EINTR: raise + except Queue.Empty: + pass + else: + for b in l: + b.prepare() # Un-serialize the brok data + self.rg.manage_brok(b) + for mod in self.modules_manager.get_internal_instances(): + try: + mod.manage_brok(b) + except Exception as err: + logger.exception( + "[%s] Warning: The mod %s raise an exception: %s," + "I'm tagging it to restart later", + self.name, mod.get_name(), err) + self.modules_manager.set_to_restart(mod) - # Commit log broks to the database - self.db.commit_and_rotate_log_db() + # just to have eventually more broks accumulated + # in our input queue: + time.sleep(0.1) # end: while not self.interrupted: self.do_stop() - - def write_protocol(self, request=None, response=None, sent=0): - if self.debug_queries: - if request is not None: - print "REQUEST>>>>>\n" + request + "\n\n" - if response is not None: - print "RESPONSE<<<<\n" + response + "\n" - print "RESPONSE SENT<<<<\n %s \n\n" % sent diff --git a/test/test_livestatus_allowedhosts.py b/test/test_livestatus_allowedhosts.py index 90d7bcf..d234c2d 100644 --- a/test/test_livestatus_allowedhosts.py +++ b/test/test_livestatus_allowedhosts.py @@ -71,7 +71,7 @@ def init_livestatus(self, conf): # NOTE: function is blocking, so must be launched in a thread #self.livestatus_broker.do_main() - self.lql_thread = threading.Thread(None, self.livestatus_broker.manage_lql_thread, 'lqlthread') + self.lql_thread = threading.Thread(None, self.livestatus_broker.main_thread_run, 'lqlthread') self.lql_thread.start() # wait for thread to init time.sleep(3) diff --git a/test/test_wait_query.py b/test/test_wait_query.py index 444c29e..9214df2 100644 --- a/test/test_wait_query.py +++ b/test/test_wait_query.py @@ -78,7 +78,7 @@ def init_livestatus(self, conf): self.sched.fill_initial_broks('Default-Broker') self.update_broker() # execute the livestatus by starting a dedicated thread to run the manage_lql_thread function: - self.lql_thread = threading.Thread(target=self.livestatus_broker.manage_lql_thread, name='lqlthread') + self.lql_thread = threading.Thread(target=self.livestatus_broker.main_thread_run, name='lqlthread') self.lql_thread.start() t0 = time.time() # give some time for the thread to init and creates its listener socket(s) :