-
Notifications
You must be signed in to change notification settings - Fork 19
/
graphyte.py
246 lines (210 loc) · 9.28 KB
/
graphyte.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Send data to Graphite metrics server (synchronously or on a background thread).
For example usage, see README.rst.
This code is licensed under a permissive MIT license -- see LICENSE.txt.
The graphyte project lives on GitHub here:
https://github.com/benhoyt/graphyte
"""
import atexit
import logging
try:
import queue
except ImportError:
import Queue as queue # Python 2.x compatibility
import socket
import threading
import time
__all__ = ['Sender', 'init', 'send']
__version__ = '1.7.1'
default_sender = None
logger = logging.getLogger(__name__)
def _has_whitespace(value):
return not value or value.split(None, 1)[0] != value
class Sender:
def __init__(self, host, port=2003, prefix=None, timeout=5, interval=None,
queue_size=None, log_sends=False, protocol='tcp',
batch_size=1000, tags={}, raise_send_errors=False):
"""Initialize a Sender instance, starting the background thread to
send messages at given interval (in seconds) if "interval" is not
None. Send at most "batch_size" messages per socket send operation.
Default protocol is TCP; use protocol='udp' for UDP.
Use "tags" to specify common or default tags for this Sender, which
are sent with each metric along with any tags passed to send().
"""
self.host = host
self.port = port
self.prefix = prefix
self.timeout = timeout
self.interval = interval
self.log_sends = log_sends
self.protocol = protocol
self.batch_size = batch_size
self.tags = tags
self.raise_send_errors = raise_send_errors
if self.interval is not None:
if raise_send_errors:
raise ValueError('raise_send_errors must be disabled when interval is set')
if queue_size is None:
queue_size = int(round(interval)) * 100
self._queue = queue.Queue(maxsize=queue_size)
self._thread = threading.Thread(target=self._thread_loop)
self._thread.daemon = True
self._thread.start()
atexit.register(self.stop)
def __del__(self):
self.stop()
def stop(self):
"""Tell the sender thread to finish and wait for it to stop sending
(should be at most "timeout" seconds).
"""
if self.interval is not None:
self._queue.put_nowait(None)
self._thread.join()
self.interval = None
def build_message(self, metric, value, timestamp, tags={}):
"""Build a Graphite message to send and return it as a byte string."""
if _has_whitespace(metric):
raise ValueError('"metric" must not have whitespace in it')
if not isinstance(value, (int, float)):
raise TypeError('"value" must be an int or a float, not a {}'.format(
type(value).__name__))
all_tags = self.tags.copy()
all_tags.update(tags)
tags_strs = [u';{}={}'.format(k, v) for k, v in sorted(all_tags.items())]
if any(_has_whitespace(t) for t in tags_strs):
raise ValueError('"tags" keys and values must not have whitespace in them')
tags_suffix = ''.join(tags_strs)
message = u'{}{}{} {} {}\n'.format(
self.prefix + '.' if self.prefix else '',
metric,
tags_suffix,
value,
int(round(timestamp))
)
message = message.encode('utf-8')
return message
def send(self, metric, value, timestamp=None, tags={}):
"""Send given metric and (int or float) value to Graphite host.
Performs send on background thread if "interval" was specified when
creating this Sender.
If a "tags" dict is specified, send the tags to the Graphite host along
with the metric, in addition to any default tags passed to Sender() --
the tags argument here overrides any default tags.
"""
if timestamp is None:
timestamp = time.time()
message = self.build_message(metric, value, timestamp, tags=tags)
if self.interval is None:
self.send_socket(message)
else:
try:
self._queue.put_nowait(message)
except queue.Full:
logger.error('queue full when sending {!r}'.format(message))
def send_message(self, message):
if self.protocol == 'tcp':
sock = socket.create_connection((self.host, self.port), self.timeout)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.sendall(message)
finally: # sockets don't support "with" statement on Python 2.x
sock.close()
elif self.protocol == 'udp':
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.sendto(message, (self.host, self.port))
finally:
sock.close()
else:
raise ValueError('"protocol" must be \'tcp\' or \'udp\', not {!r}'.format(self.protocol))
def send_socket(self, message):
"""Low-level function to send message bytes to this Sender's socket.
You should usually call send() instead of this function (unless you're
subclassing or writing unit tests).
"""
if self.log_sends:
start_time = time.time()
try:
self.send_message(message)
except Exception as error:
if self.raise_send_errors:
raise
logger.error('error sending message {!r}: {}'.format(message, error))
else:
if self.log_sends:
elapsed_time = time.time() - start_time
logger.info('sent message {!r} to {}:{} in {:.03f} seconds'.format(
message, self.host, self.port, elapsed_time))
def _thread_loop(self):
"""Background thread used when Sender is in asynchronous/interval mode."""
last_check_time = time.time()
messages = []
while True:
# Get first message from queue, blocking until the next time we
# should be sending
time_since_last_check = time.time() - last_check_time
time_till_next_check = max(0, self.interval - time_since_last_check)
try:
message = self._queue.get(timeout=time_till_next_check)
except queue.Empty:
pass
else:
if message is None:
# None is the signal to stop this background thread
break
messages.append(message)
# Get any other messages currently on queue without blocking,
# paying attention to None ("stop thread" signal)
should_stop = False
while True:
try:
message = self._queue.get_nowait()
except queue.Empty:
break
if message is None:
should_stop = True
break
messages.append(message)
if should_stop:
break
# If it's time to send, send what we've collected
current_time = time.time()
if current_time - last_check_time >= self.interval:
last_check_time = current_time
for i in range(0, len(messages), self.batch_size):
batch = messages[i:i + self.batch_size]
self.send_socket(b''.join(batch))
messages = []
# Send any final messages before exiting thread
for i in range(0, len(messages), self.batch_size):
batch = messages[i:i + self.batch_size]
self.send_socket(b''.join(batch))
def init(*args, **kwargs):
"""Initialize default Sender instance with given args."""
global default_sender
default_sender = Sender(*args, **kwargs)
def send(*args, **kwargs):
"""Send message using default Sender instance."""
default_sender.send(*args, **kwargs)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('metric',
help='name of metric to send')
parser.add_argument('value', type=float,
help='numeric value to send')
parser.add_argument('-s', '--server', default='localhost',
help='hostname of Graphite server to send to, default %(default)s')
parser.add_argument('-p', '--port', type=int, default=2003,
help='port to send message to, default %(default)d')
parser.add_argument('-u', '--udp', action='store_true',
help='send via UDP instead of TCP')
parser.add_argument('-t', '--timestamp', type=int,
help='Unix timestamp for message (defaults to current time)')
parser.add_argument('-q', '--quiet', action='store_true',
help="quiet mode (don't log send to stdout)")
args = parser.parse_args()
if not args.quiet:
logging.basicConfig(level=logging.INFO, format='%(message)s')
sender = Sender(args.server, port=args.port, log_sends=not args.quiet,
protocol='udp' if args.udp else 'tcp')
sender.send(args.metric, args.value, timestamp=args.timestamp)