Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async support. #149

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
.. _thread-client-chapter:

=====================
Async
=====================


Timing async functions.
===========================

Timers work with async functions automatically by `await`ing them. See :ref:`Using Timers <using-a-decorator>`.

ThreadStatsClient
===========================

Both the UDP and TCP StatsClients perform potentially-blocking network operations so likely aren't suitable for using
alongside an event loop (although UDP is generally non-blocking so might meet your needs just fine). To safely
records stats in asynchronous code, a thread-based StatsClient is provided. This wraps another StatsClient and
uses a single background thread for the network operations.

.. code-block:: python

from statsd import StatsClient, ThreadStatsClient

statsd = ThreadStatsClient(client=StatsClient())

# Send stats like normal, in sync or async code:

@statsd.timer('async_func')
async def async_func():
"""Do something asynchronously"""


@statsd.timer('synchronous_func')
def synchronous_func():
"""Do something quick logic"""

async def main():
synchronous_func()
await async_func()
statsd.incr("main")

import trio
trio.run(main)

statsd.close() # Make sure to flush the queue and stop the thread


* The ``queue_size`` parameter controls how many metrics can queue up. Default is 1000.

* The ``no_fail`` parameter controls whether a full queue raises an exception (`True`: default) or simply drops the metric (specify `False`).

* The ``daemon`` parameter can be used to put the background thread in daemon mode so you don't have to remember to close it. This will prevent
the background thread from keeping the application running when the main thread returns. The cost is that any metrics still in the queue will be lost. This is only suitable ex. for long-running services where that's not a concern.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Contents
pipeline.rst
tcp.rst
unix_socket.rst
async.rst
reference.rst
contributing.rst

Expand Down
20 changes: 20 additions & 0 deletions docs/timing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ execute will be sent to the statsd server.
myfunc(1, 2)
myfunc(3, 7)

This works with async functions too (Python 3.5+). It measures the total elapsed time
from when the function is called to when it completes.

.. code-block:: python

from statsd import StatsClient

statsd = StatsClient()

@statsd.timer('myfunc')
async def myfunc(a, b):
"""Do asynchronous IO for a and b"""

async def main():
# Timing is measured even while myfunc yields and other coroutines are running.
await myfunc(1, 2)

# Run main() using your choice of event loop here (asyncio, trio, etc...):
import trio
trio.run(main)


.. _timer-object:
Expand Down
3 changes: 2 additions & 1 deletion statsd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from .client import StatsClient
from .client import TCPStatsClient
from .client import UnixSocketStatsClient
from .client import ThreadStatsClient


VERSION = (3, 2, 1)
__version__ = '.'.join(map(str, VERSION))
__all__ = ['StatsClient', 'TCPStatsClient', 'UnixSocketStatsClient']
__all__ = ['StatsClient', 'TCPStatsClient', 'UnixSocketStatsClient', 'ThreadStatsClient']
2 changes: 2 additions & 0 deletions statsd/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import absolute_import, division, unicode_literals


from .stream import TCPStatsClient, UnixSocketStatsClient # noqa
from .udp import StatsClient # noqa
from .thread import ThreadStatsClient
17 changes: 17 additions & 0 deletions statsd/client/async_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from inspect import iscoroutinefunction
from .base_timer import Timer, time_now, safe_wraps

class AsyncTimer(Timer):
def __call__(self, f):
if iscoroutinefunction(f):
@safe_wraps(f)
async def _wrapped(*args, **kwargs):
start_time = time_now()
try:
return await f(*args, **kwargs)
finally:
elapsed_time_ms = 1000.0 * (time_now() - start_time)
self.client.timing(self.stat, elapsed_time_ms, self.rate)
return _wrapped
else:
return super().__call__(f)
71 changes: 71 additions & 0 deletions statsd/client/base_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import absolute_import, division, unicode_literals

import functools

# Use timer that's not susceptible to time of day adjustments.
try:
# perf_counter is only present on Py3.3+
from time import perf_counter as time_now
except ImportError:
# fall back to using time
from time import time as time_now


def safe_wraps(wrapper, *args, **kwargs):
"""Safely wraps partial functions."""
while isinstance(wrapper, functools.partial):
wrapper = wrapper.func
return functools.wraps(wrapper, *args, **kwargs)


class Timer(object):
"""A context manager/decorator for statsd.timing()."""

def __init__(self, client, stat, rate=1):
self.client = client
self.stat = stat
self.rate = rate
self.ms = None
self._sent = False
self._start_time = None

def __call__(self, f):
"""Thread-safe timing function decorator."""
@safe_wraps(f)
def _wrapped(*args, **kwargs):
start_time = time_now()
try:
return f(*args, **kwargs)
finally:
elapsed_time_ms = 1000.0 * (time_now() - start_time)
self.client.timing(self.stat, elapsed_time_ms, self.rate)
return _wrapped

def __enter__(self):
return self.start()

def __exit__(self, typ, value, tb):
self.stop()

def start(self):
self.ms = None
self._sent = False
self._start_time = time_now()
return self

def stop(self, send=True):
if self._start_time is None:
raise RuntimeError('Timer has not started.')
dt = time_now() - self._start_time
self.ms = 1000.0 * dt # Convert to milliseconds.
if send:
self.send()
return self

def send(self):
if self.ms is None:
raise RuntimeError('No data recorded.')
if self._sent:
raise RuntimeError('Already sent data.')
self._sent = True
self.client.timing(self.stat, self.ms, self.rate)
68 changes: 68 additions & 0 deletions statsd/client/thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import absolute_import, division, unicode_literals

import threading
from collections import deque
try:
from Queue import Queue, Full # Python 2
except ImportError:
from queue import Queue, Full # Python 3

from .base import StatsClientBase, PipelineBase


class CLOSE(object):
pass


class Pipeline(PipelineBase):
def _send(self):
self._client._send(self._stats)
# Clearing the deque by making a new one: we only want the thread to send
# stats that exist in the queue as of right now, not those that might be
# added to this pipeline before the thread picks up the _stats deque we just
# sent it:
self._client._stats = deque()


class ThreadStatsClient(StatsClientBase):
def __init__(self, client, prefix=None, queue_size=1000, no_fail=True, daemon=False):
self._prefix = prefix
self._client = client # The StatsClient instance we're wrapping.
self._client_pipeline = client.pipeline()

self._no_fail = no_fail
self._queue = Queue(maxsize=queue_size) # Don't allow too much data to be
# buffered or we could grow unbounded and use all the memory.
self._thread = threading.Thread(target=self._background_thread)
self._thread.daemon = daemon
self._thread.start()

def _send(self, data):
try:
self._queue.put(data, block=False)
except Full:
if self._no_fail:
# No time for love, Dr. Jones!
pass
else:
raise

def close(self):
self._queue.put(CLOSE)
self._thread.join()

def _background_thread(self):
while True:
data = self._queue.get()
if data == CLOSE:
self._client.close()
break
elif isinstance(data, deque):
# We got a pipeline's data, using the wrapped client's pipeline to send:
self._client_pipeline._stats = data
self._client_pipeline._send()
else:
self._client._send(data)

def pipeline(self):
return Pipeline(self)
76 changes: 6 additions & 70 deletions statsd/client/timer.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,7 @@
from __future__ import absolute_import, division, unicode_literals
# Only including the async functionality in timer if on a Python version with coroutines
import sys

import functools

# Use timer that's not susceptible to time of day adjustments.
try:
# perf_counter is only present on Py3.3+
from time import perf_counter as time_now
except ImportError:
# fall back to using time
from time import time as time_now


def safe_wraps(wrapper, *args, **kwargs):
"""Safely wraps partial functions."""
while isinstance(wrapper, functools.partial):
wrapper = wrapper.func
return functools.wraps(wrapper, *args, **kwargs)


class Timer(object):
"""A context manager/decorator for statsd.timing()."""

def __init__(self, client, stat, rate=1):
self.client = client
self.stat = stat
self.rate = rate
self.ms = None
self._sent = False
self._start_time = None

def __call__(self, f):
"""Thread-safe timing function decorator."""
@safe_wraps(f)
def _wrapped(*args, **kwargs):
start_time = time_now()
try:
return f(*args, **kwargs)
finally:
elapsed_time_ms = 1000.0 * (time_now() - start_time)
self.client.timing(self.stat, elapsed_time_ms, self.rate)
return _wrapped

def __enter__(self):
return self.start()

def __exit__(self, typ, value, tb):
self.stop()

def start(self):
self.ms = None
self._sent = False
self._start_time = time_now()
return self

def stop(self, send=True):
if self._start_time is None:
raise RuntimeError('Timer has not started.')
dt = time_now() - self._start_time
self.ms = 1000.0 * dt # Convert to milliseconds.
if send:
self.send()
return self

def send(self):
if self.ms is None:
raise RuntimeError('No data recorded.')
if self._sent:
raise RuntimeError('Already sent data.')
self._sent = True
self.client.timing(self.stat, self.ms, self.rate)
if sys.version_info >= (3, 5): # async/await syntax is only present on Py3.5+
from .async_timer import AsyncTimer as Timer
else:
from .base_timer import Timer