From 7b50f7a56f25e3548fcd7c0fce07449d724606a8 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Thu, 7 Oct 2021 22:24:55 -0600 Subject: [PATCH 1/2] Properly timing coroutines. Resolves #119. --- docs/timing.rst | 20 ++++++++++ statsd/client/async_timer.py | 17 ++++++++ statsd/client/base_timer.py | 71 +++++++++++++++++++++++++++++++++ statsd/client/timer.py | 76 +++--------------------------------- 4 files changed, 114 insertions(+), 70 deletions(-) create mode 100644 statsd/client/async_timer.py create mode 100644 statsd/client/base_timer.py diff --git a/docs/timing.rst b/docs/timing.rst index b9cf5245d..83eec9b63 100644 --- a/docs/timing.rst +++ b/docs/timing.rst @@ -88,6 +88,26 @@ execute will be sent to the statsd server. myfunc(1, 2) myfunc(3, 7) +This works with async functions too (Python 3.5+). It measures the total elapsed time +from when the function is called to when it completes. + +.. code-block:: python + + from statsd import StatsClient + + statsd = StatsClient() + + @statsd.timer('myfunc') + async def myfunc(a, b): + """Do asynchronous IO for a and b""" + + async def main(): + # Timing is measured even while myfunc yields and other coroutines are running. + await myfunc(1, 2) + + # Run main() using your choice of event loop here (asyncio, trio, etc...): + import trio + trio.run(main) .. _timer-object: diff --git a/statsd/client/async_timer.py b/statsd/client/async_timer.py new file mode 100644 index 000000000..d3a68ea25 --- /dev/null +++ b/statsd/client/async_timer.py @@ -0,0 +1,17 @@ +from inspect import iscoroutinefunction +from .base_timer import Timer, time_now, safe_wraps + +class AsyncTimer(Timer): + def __call__(self, f): + if iscoroutinefunction(f): + @safe_wraps(f) + async def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return await f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate) + return _wrapped + else: + return super().__call__(f) \ No newline at end of file diff --git a/statsd/client/base_timer.py b/statsd/client/base_timer.py new file mode 100644 index 000000000..8c0a74dff --- /dev/null +++ b/statsd/client/base_timer.py @@ -0,0 +1,71 @@ +from __future__ import absolute_import, division, unicode_literals + +import functools + +# Use timer that's not susceptible to time of day adjustments. +try: + # perf_counter is only present on Py3.3+ + from time import perf_counter as time_now +except ImportError: + # fall back to using time + from time import time as time_now + + +def safe_wraps(wrapper, *args, **kwargs): + """Safely wraps partial functions.""" + while isinstance(wrapper, functools.partial): + wrapper = wrapper.func + return functools.wraps(wrapper, *args, **kwargs) + + +class Timer(object): + """A context manager/decorator for statsd.timing().""" + + def __init__(self, client, stat, rate=1): + self.client = client + self.stat = stat + self.rate = rate + self.ms = None + self._sent = False + self._start_time = None + + def __call__(self, f): + """Thread-safe timing function decorator.""" + @safe_wraps(f) + def _wrapped(*args, **kwargs): + start_time = time_now() + try: + return f(*args, **kwargs) + finally: + elapsed_time_ms = 1000.0 * (time_now() - start_time) + self.client.timing(self.stat, elapsed_time_ms, self.rate) + return _wrapped + + def __enter__(self): + return self.start() + + def __exit__(self, typ, value, tb): + self.stop() + + def start(self): + self.ms = None + self._sent = False + self._start_time = time_now() + return self + + def stop(self, send=True): + if self._start_time is None: + raise RuntimeError('Timer has not started.') + dt = time_now() - self._start_time + self.ms = 1000.0 * dt # Convert to milliseconds. + if send: + self.send() + return self + + def send(self): + if self.ms is None: + raise RuntimeError('No data recorded.') + if self._sent: + raise RuntimeError('Already sent data.') + self._sent = True + self.client.timing(self.stat, self.ms, self.rate) diff --git a/statsd/client/timer.py b/statsd/client/timer.py index fefc9d042..564fdca17 100644 --- a/statsd/client/timer.py +++ b/statsd/client/timer.py @@ -1,71 +1,7 @@ -from __future__ import absolute_import, division, unicode_literals +# Only including the async functionality in timer if on a Python version with coroutines +import sys -import functools - -# Use timer that's not susceptible to time of day adjustments. -try: - # perf_counter is only present on Py3.3+ - from time import perf_counter as time_now -except ImportError: - # fall back to using time - from time import time as time_now - - -def safe_wraps(wrapper, *args, **kwargs): - """Safely wraps partial functions.""" - while isinstance(wrapper, functools.partial): - wrapper = wrapper.func - return functools.wraps(wrapper, *args, **kwargs) - - -class Timer(object): - """A context manager/decorator for statsd.timing().""" - - def __init__(self, client, stat, rate=1): - self.client = client - self.stat = stat - self.rate = rate - self.ms = None - self._sent = False - self._start_time = None - - def __call__(self, f): - """Thread-safe timing function decorator.""" - @safe_wraps(f) - def _wrapped(*args, **kwargs): - start_time = time_now() - try: - return f(*args, **kwargs) - finally: - elapsed_time_ms = 1000.0 * (time_now() - start_time) - self.client.timing(self.stat, elapsed_time_ms, self.rate) - return _wrapped - - def __enter__(self): - return self.start() - - def __exit__(self, typ, value, tb): - self.stop() - - def start(self): - self.ms = None - self._sent = False - self._start_time = time_now() - return self - - def stop(self, send=True): - if self._start_time is None: - raise RuntimeError('Timer has not started.') - dt = time_now() - self._start_time - self.ms = 1000.0 * dt # Convert to milliseconds. - if send: - self.send() - return self - - def send(self): - if self.ms is None: - raise RuntimeError('No data recorded.') - if self._sent: - raise RuntimeError('Already sent data.') - self._sent = True - self.client.timing(self.stat, self.ms, self.rate) +if sys.version_info >= (3, 5): # async/await syntax is only present on Py3.5+ + from .async_timer import AsyncTimer as Timer +else: + from .base_timer import Timer \ No newline at end of file From 031fc6678d72a66a0be222629227868315add551 Mon Sep 17 00:00:00 2001 From: Cameron Lee Date: Tue, 12 Oct 2021 18:36:56 -0600 Subject: [PATCH 2/2] Resolves #119: Added ThreadStatsClient for safe use in async code. --- docs/async.rst | 54 +++++++++++++++++++++++++++++++ docs/index.rst | 1 + statsd/__init__.py | 3 +- statsd/client/__init__.py | 2 ++ statsd/client/thread.py | 68 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 docs/async.rst create mode 100644 statsd/client/thread.py diff --git a/docs/async.rst b/docs/async.rst new file mode 100644 index 000000000..5506adb8b --- /dev/null +++ b/docs/async.rst @@ -0,0 +1,54 @@ +.. _thread-client-chapter: + +===================== +Async +===================== + + +Timing async functions. +=========================== + +Timers work with async functions automatically by `await`ing them. See :ref:`Using Timers `. + +ThreadStatsClient +=========================== + +Both the UDP and TCP StatsClients perform potentially-blocking network operations so likely aren't suitable for using +alongside an event loop (although UDP is generally non-blocking so might meet your needs just fine). To safely +records stats in asynchronous code, a thread-based StatsClient is provided. This wraps another StatsClient and +uses a single background thread for the network operations. + +.. code-block:: python + + from statsd import StatsClient, ThreadStatsClient + + statsd = ThreadStatsClient(client=StatsClient()) + + # Send stats like normal, in sync or async code: + + @statsd.timer('async_func') + async def async_func(): + """Do something asynchronously""" + + + @statsd.timer('synchronous_func') + def synchronous_func(): + """Do something quick logic""" + + async def main(): + synchronous_func() + await async_func() + statsd.incr("main") + + import trio + trio.run(main) + + statsd.close() # Make sure to flush the queue and stop the thread + + +* The ``queue_size`` parameter controls how many metrics can queue up. Default is 1000. + +* The ``no_fail`` parameter controls whether a full queue raises an exception (`True`: default) or simply drops the metric (specify `False`). + +* The ``daemon`` parameter can be used to put the background thread in daemon mode so you don't have to remember to close it. This will prevent +the background thread from keeping the application running when the main thread returns. The cost is that any metrics still in the queue will be lost. This is only suitable ex. for long-running services where that's not a concern. \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 11f7aa4de..9e7fb096e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -82,6 +82,7 @@ Contents pipeline.rst tcp.rst unix_socket.rst + async.rst reference.rst contributing.rst diff --git a/statsd/__init__.py b/statsd/__init__.py index 7c56a47d0..9f882843a 100644 --- a/statsd/__init__.py +++ b/statsd/__init__.py @@ -3,8 +3,9 @@ from .client import StatsClient from .client import TCPStatsClient from .client import UnixSocketStatsClient +from .client import ThreadStatsClient VERSION = (3, 2, 1) __version__ = '.'.join(map(str, VERSION)) -__all__ = ['StatsClient', 'TCPStatsClient', 'UnixSocketStatsClient'] +__all__ = ['StatsClient', 'TCPStatsClient', 'UnixSocketStatsClient', 'ThreadStatsClient'] diff --git a/statsd/client/__init__.py b/statsd/client/__init__.py index 62cd202c7..49df7e191 100644 --- a/statsd/client/__init__.py +++ b/statsd/client/__init__.py @@ -1,4 +1,6 @@ from __future__ import absolute_import, division, unicode_literals + from .stream import TCPStatsClient, UnixSocketStatsClient # noqa from .udp import StatsClient # noqa +from .thread import ThreadStatsClient diff --git a/statsd/client/thread.py b/statsd/client/thread.py new file mode 100644 index 000000000..8b4bc8c2f --- /dev/null +++ b/statsd/client/thread.py @@ -0,0 +1,68 @@ +from __future__ import absolute_import, division, unicode_literals + +import threading +from collections import deque +try: + from Queue import Queue, Full # Python 2 +except ImportError: + from queue import Queue, Full # Python 3 + +from .base import StatsClientBase, PipelineBase + + +class CLOSE(object): + pass + + +class Pipeline(PipelineBase): + def _send(self): + self._client._send(self._stats) + # Clearing the deque by making a new one: we only want the thread to send + # stats that exist in the queue as of right now, not those that might be + # added to this pipeline before the thread picks up the _stats deque we just + # sent it: + self._client._stats = deque() + + +class ThreadStatsClient(StatsClientBase): + def __init__(self, client, prefix=None, queue_size=1000, no_fail=True, daemon=False): + self._prefix = prefix + self._client = client # The StatsClient instance we're wrapping. + self._client_pipeline = client.pipeline() + + self._no_fail = no_fail + self._queue = Queue(maxsize=queue_size) # Don't allow too much data to be + # buffered or we could grow unbounded and use all the memory. + self._thread = threading.Thread(target=self._background_thread) + self._thread.daemon = daemon + self._thread.start() + + def _send(self, data): + try: + self._queue.put(data, block=False) + except Full: + if self._no_fail: + # No time for love, Dr. Jones! + pass + else: + raise + + def close(self): + self._queue.put(CLOSE) + self._thread.join() + + def _background_thread(self): + while True: + data = self._queue.get() + if data == CLOSE: + self._client.close() + break + elif isinstance(data, deque): + # We got a pipeline's data, using the wrapped client's pipeline to send: + self._client_pipeline._stats = data + self._client_pipeline._send() + else: + self._client._send(data) + + def pipeline(self): + return Pipeline(self)