Skip to content

Commit

Permalink
Fix double init of prometheus metrics (#2544)
Browse files Browse the repository at this point in the history
Tornado handlers are initaliazed multiple times within the same
python interpreter process (once per request). Adding new prometheus
gauges/counters every time leads to `ValueError: Duplicated timeseries`
errors because `prometheus_client` has one single, global registry to
track all known metrics. Instead, use the collector mechanism which is
the recommended way to generate metrics.

Also, enable prometheus tests for worker and add `prometheus_client` to
the official dev requirements.
  • Loading branch information
crepererum authored and mrocklin committed Mar 4, 2019
1 parent df4b70c commit b800a35
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 21 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ ipython >= 5.0.0
jupyter_client >= 4.4.0
ipykernel >= 4.5.2
pytest >= 3.0.5
prometheus_client >= 0.6.0
47 changes: 37 additions & 10 deletions distributed/bokeh/scheduler_html.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,51 @@ def get(self):
self.write(result)


class _PrometheusCollector(object):
def __init__(self, server, prometheus_client):
self.server = server
self.prometheus_client = prometheus_client

def collect(self):
yield self.prometheus_client.core.GaugeMetricFamily(
'dask_scheduler_workers',
'Number of workers.',
value=len(self.server.workers),
)
yield self.prometheus_client.core.GaugeMetricFamily(
'dask_scheduler_clients',
'Number of clients.',
value=len(self.server.clients),
)


class PrometheusHandler(RequestHandler):
_initialized = False

def __init__(self, *args, **kwargs):
import prometheus_client # keep out of global namespace
import prometheus_client # keep out of global namespace
self.prometheus_client = prometheus_client

super(PrometheusHandler, self).__init__(*args, **kwargs)
self.workers = self.prometheus_client.Gauge('workers_total',
'Total number of workers.',
namespace='scheduler')
self.clients = self.prometheus_client.Gauge('clients_total',
'Total number of clients.',
namespace='scheduler')

def get(self):
self.workers.set(len(self.server.workers))
self.clients.set(len(self.server.clients))
self._init()

def _init(self):
if PrometheusHandler._initialized:
return

self.prometheus_client.REGISTRY.register(
_PrometheusCollector(
self.server,
self.prometheus_client,
)
)

PrometheusHandler._initialized = True

def get(self):
self.write(self.prometheus_client.generate_latest())
self.set_header('Content-Type', 'text/plain; version=0.0.4')


routes = [
Expand Down
20 changes: 17 additions & 3 deletions distributed/bokeh/tests/test_scheduler_bokeh_html.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,21 @@ def test_prefix(c, s, a, b):
scheduler_kwargs={'services': {('bokeh', 0): BokehScheduler}})
def test_prometheus(c, s, a, b):
pytest.importorskip('prometheus_client')
from prometheus_client.parser import text_string_to_metric_families

http_client = AsyncHTTPClient()
response = yield http_client.fetch('http://localhost:%d/metrics'
% s.services['bokeh'].port)
assert response.code == 200

# request data twice since there once was a case where metrics got registered multiple times resulting in
# prometheus_client errors
for _ in range(2):
response = yield http_client.fetch('http://localhost:%d/metrics'
% s.services['bokeh'].port)
assert response.code == 200
assert response.headers['Content-Type'] == 'text/plain; version=0.0.4'

txt = response.body.decode('utf8')
families = {
familiy.name
for familiy in text_string_to_metric_families(txt)
}
assert 'dask_scheduler_workers' in families
30 changes: 30 additions & 0 deletions distributed/bokeh/tests/test_worker_bokeh_html.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest
pytest.importorskip('bokeh')

from tornado.httpclient import AsyncHTTPClient
from distributed.utils_test import gen_cluster
from distributed.bokeh.worker import BokehWorker


@gen_cluster(client=True,
worker_kwargs={'services': {('bokeh', 0): BokehWorker}})
def test_prometheus(c, s, a, b):
pytest.importorskip('prometheus_client')
from prometheus_client.parser import text_string_to_metric_families

http_client = AsyncHTTPClient()

# request data twice since there once was a case where metrics got registered multiple times resulting in
# prometheus_client errors
for _ in range(2):
response = yield http_client.fetch('http://localhost:%d/metrics'
% a.services['bokeh'].port)
assert response.code == 200
assert response.headers['Content-Type'] == 'text/plain; version=0.0.4'

txt = response.body.decode('utf8')
families = {
familiy.name
for familiy in text_string_to_metric_families(txt)
}
assert len(families) > 0
47 changes: 39 additions & 8 deletions distributed/bokeh/worker_html.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,53 @@ def get_template_path(self):
return os.path.join(dirname, 'templates')


class _PrometheusCollector(object):
def __init__(self, server, prometheus_client):
self.server = server
self.prometheus_client = prometheus_client

def collect(self):
# add your metrics here:
#
# 1. remove the following lines
while False:
yield None
#
# 2. yield your metrics
# yield self.prometheus_client.core.GaugeMetricFamily(
# 'dask_worker_connections',
# 'Number of connections currently open.',
# value=???,
# )


class PrometheusHandler(RequestHandler):
_initialized = False

def __init__(self, *args, **kwargs):
import prometheus_client # keep out of global namespace
import prometheus_client # keep out of global namespace
self.prometheus_client = prometheus_client

super(PrometheusHandler, self).__init__(*args, **kwargs)
# Add metrics like this:
# self.workers = self.prometheus_client.Gauge('memory_bytes',
# 'Total memory.',
# namespace='worker')

def get(self):
# Example metric update
# self.workers.set(0.)
self._init()

def _init(self):
if PrometheusHandler._initialized:
return

self.prometheus_client.REGISTRY.register(
_PrometheusCollector(
self.server,
self.prometheus_client,
)
)

PrometheusHandler._initialized = True

def get(self):
self.write(self.prometheus_client.generate_latest())
self.set_header('Content-Type', 'text/plain; version=0.0.4')


routes = [
Expand Down

0 comments on commit b800a35

Please sign in to comment.