Skip to content

Commit

Permalink
#76 implemented flush grouped metrics when it program terminates
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Sep 10, 2018
1 parent 18ea7bf commit c906595
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
12 changes: 8 additions & 4 deletions basescript/basescript.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ def __init__(self, args=None):
if self.args.metric_grouping_interval is None:
self.args.metric_grouping_interval = self.METRIC_GROUPING_INTERVAL

self.log = init_logger(
log = init_logger(
fmt=self.args.log_format,
quiet=self.args.quiet,
level=self.args.log_level,
fpath=self.args.log_file,
pre_hooks=self.define_log_pre_format_hooks(),
post_hooks=self.define_log_post_format_hooks(),
metric_grouping_interval=self.args.metric_grouping_interval
).bind(name=self.args.name)
)

self._flush_metrics_q = log._force_flush_q
self.log = log.bind(name=self.args.name)

self.stats = Dummy()

Expand All @@ -73,12 +76,13 @@ def start(self):
raise
except KeyboardInterrupt:
self.log.warning("exited via keyboard interrupt")
sys.exit(1)
except:
self.log.exception("exited start function")
# set exit code so we know it did not end successfully
# TODO different exit codes based on signals ?
sys.exit(1)
finally:
self._flush_metrics_q.put(None, block=True)
self._flush_metrics_q.put(None, block=True, timeout=1)

self.log.info("exited successfully")

Expand Down
18 changes: 16 additions & 2 deletions basescript/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
import socket
import logging
import numbers
import queue
from threading import Thread, Lock
from datetime import datetime
from functools import wraps

from deeputil import Dummy
from deeputil import Dummy, keeprunning
import structlog

# stdlib to structlog handlers should be configured only once.
_GLOBAL_LOG_CONFIGURED = False

FORCE_FLUSH_Q_SIZE = 1
HOSTNAME = socket.gethostname()
METRICS_STATE = {}
METRICS_STATE_LOCK = Lock()
Expand Down Expand Up @@ -270,11 +272,18 @@ def _structlog_default_keys_processor(logger_class, log_method, event):

return event

@keeprunning()
def dump_metrics(log, interval):
global METRICS_STATE

terminate = False

while True:
time.sleep(interval)
try:
log._force_flush_q.get(block=True, timeout=interval)
terminate = True
except queue.Empty:
pass

METRICS_STATE_LOCK.acquire()
m = METRICS_STATE
Expand All @@ -292,6 +301,9 @@ def dump_metrics(log, interval):
fn = getattr(log, level)
fn(event, type='metric', __grouped__=True, num=n, **d)

if terminate:
break

def metrics_grouping_processor(logger_class, log_method, event):
if event.get('type') == 'logged_metric':
event['type'] = 'metric'
Expand Down Expand Up @@ -459,6 +471,8 @@ def init_logger(
level = getattr(logging, level.upper())
log.setLevel(level)

log._force_flush_q = queue.Queue(maxsize=FORCE_FLUSH_Q_SIZE)

if metric_grouping_interval:
keep_running = Thread(target=dump_metrics, args=(log, metric_grouping_interval))
keep_running.daemon = True
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def get_long_description():

long_description = get_long_description()

version = '0.2.6'
version = '0.2.7'
setup(
name="basescript",
version=version,
Expand Down

0 comments on commit c906595

Please sign in to comment.