From b800a35e2a203315d6ead65750ed4ba12c747336 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 4 Mar 2019 18:09:16 +0000 Subject: [PATCH] Fix double init of prometheus metrics (#2544) Tornado handlers are initaliazed multiple times within the same python interpreter process (once per request). Adding new prometheus gauges/counters every time leads to `ValueError: Duplicated timeseries` errors because `prometheus_client` has one single, global registry to track all known metrics. Instead, use the collector mechanism which is the recommended way to generate metrics. Also, enable prometheus tests for worker and add `prometheus_client` to the official dev requirements. --- dev-requirements.txt | 1 + distributed/bokeh/scheduler_html.py | 47 +++++++++++++++---- .../bokeh/tests/test_scheduler_bokeh_html.py | 20 ++++++-- .../bokeh/tests/test_worker_bokeh_html.py | 30 ++++++++++++ distributed/bokeh/worker_html.py | 47 +++++++++++++++---- 5 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 distributed/bokeh/tests/test_worker_bokeh_html.py diff --git a/dev-requirements.txt b/dev-requirements.txt index 295fed1f22..7d684343ca 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,3 +9,4 @@ ipython >= 5.0.0 jupyter_client >= 4.4.0 ipykernel >= 4.5.2 pytest >= 3.0.5 +prometheus_client >= 0.6.0 diff --git a/distributed/bokeh/scheduler_html.py b/distributed/bokeh/scheduler_html.py index 10e84bd4b3..e8b030156d 100644 --- a/distributed/bokeh/scheduler_html.py +++ b/distributed/bokeh/scheduler_html.py @@ -163,24 +163,51 @@ def get(self): self.write(result) +class _PrometheusCollector(object): + def __init__(self, server, prometheus_client): + self.server = server + self.prometheus_client = prometheus_client + + def collect(self): + yield self.prometheus_client.core.GaugeMetricFamily( + 'dask_scheduler_workers', + 'Number of workers.', + value=len(self.server.workers), + ) + yield self.prometheus_client.core.GaugeMetricFamily( + 'dask_scheduler_clients', + 'Number of clients.', + value=len(self.server.clients), + ) + + class PrometheusHandler(RequestHandler): + _initialized = False + def __init__(self, *args, **kwargs): - import prometheus_client # keep out of global namespace + import prometheus_client # keep out of global namespace self.prometheus_client = prometheus_client super(PrometheusHandler, self).__init__(*args, **kwargs) - self.workers = self.prometheus_client.Gauge('workers_total', - 'Total number of workers.', - namespace='scheduler') - self.clients = self.prometheus_client.Gauge('clients_total', - 'Total number of clients.', - namespace='scheduler') - def get(self): - self.workers.set(len(self.server.workers)) - self.clients.set(len(self.server.clients)) + self._init() + + def _init(self): + if PrometheusHandler._initialized: + return + + self.prometheus_client.REGISTRY.register( + _PrometheusCollector( + self.server, + self.prometheus_client, + ) + ) + PrometheusHandler._initialized = True + + def get(self): self.write(self.prometheus_client.generate_latest()) + self.set_header('Content-Type', 'text/plain; version=0.0.4') routes = [ diff --git a/distributed/bokeh/tests/test_scheduler_bokeh_html.py b/distributed/bokeh/tests/test_scheduler_bokeh_html.py index c2e5d3220e..5f8b55a531 100644 --- a/distributed/bokeh/tests/test_scheduler_bokeh_html.py +++ b/distributed/bokeh/tests/test_scheduler_bokeh_html.py @@ -67,7 +67,21 @@ def test_prefix(c, s, a, b): scheduler_kwargs={'services': {('bokeh', 0): BokehScheduler}}) def test_prometheus(c, s, a, b): pytest.importorskip('prometheus_client') + from prometheus_client.parser import text_string_to_metric_families + http_client = AsyncHTTPClient() - response = yield http_client.fetch('http://localhost:%d/metrics' - % s.services['bokeh'].port) - assert response.code == 200 + + # request data twice since there once was a case where metrics got registered multiple times resulting in + # prometheus_client errors + for _ in range(2): + response = yield http_client.fetch('http://localhost:%d/metrics' + % s.services['bokeh'].port) + assert response.code == 200 + assert response.headers['Content-Type'] == 'text/plain; version=0.0.4' + + txt = response.body.decode('utf8') + families = { + familiy.name + for familiy in text_string_to_metric_families(txt) + } + assert 'dask_scheduler_workers' in families diff --git a/distributed/bokeh/tests/test_worker_bokeh_html.py b/distributed/bokeh/tests/test_worker_bokeh_html.py new file mode 100644 index 0000000000..0a56e97e05 --- /dev/null +++ b/distributed/bokeh/tests/test_worker_bokeh_html.py @@ -0,0 +1,30 @@ +import pytest +pytest.importorskip('bokeh') + +from tornado.httpclient import AsyncHTTPClient +from distributed.utils_test import gen_cluster +from distributed.bokeh.worker import BokehWorker + + +@gen_cluster(client=True, + worker_kwargs={'services': {('bokeh', 0): BokehWorker}}) +def test_prometheus(c, s, a, b): + pytest.importorskip('prometheus_client') + from prometheus_client.parser import text_string_to_metric_families + + http_client = AsyncHTTPClient() + + # request data twice since there once was a case where metrics got registered multiple times resulting in + # prometheus_client errors + for _ in range(2): + response = yield http_client.fetch('http://localhost:%d/metrics' + % a.services['bokeh'].port) + assert response.code == 200 + assert response.headers['Content-Type'] == 'text/plain; version=0.0.4' + + txt = response.body.decode('utf8') + families = { + familiy.name + for familiy in text_string_to_metric_families(txt) + } + assert len(families) > 0 diff --git a/distributed/bokeh/worker_html.py b/distributed/bokeh/worker_html.py index 4428d19957..a3c24b085f 100644 --- a/distributed/bokeh/worker_html.py +++ b/distributed/bokeh/worker_html.py @@ -14,22 +14,53 @@ def get_template_path(self): return os.path.join(dirname, 'templates') +class _PrometheusCollector(object): + def __init__(self, server, prometheus_client): + self.server = server + self.prometheus_client = prometheus_client + + def collect(self): + # add your metrics here: + # + # 1. remove the following lines + while False: + yield None + # + # 2. yield your metrics + # yield self.prometheus_client.core.GaugeMetricFamily( + # 'dask_worker_connections', + # 'Number of connections currently open.', + # value=???, + # ) + + class PrometheusHandler(RequestHandler): + _initialized = False + def __init__(self, *args, **kwargs): - import prometheus_client # keep out of global namespace + import prometheus_client # keep out of global namespace self.prometheus_client = prometheus_client super(PrometheusHandler, self).__init__(*args, **kwargs) - # Add metrics like this: - # self.workers = self.prometheus_client.Gauge('memory_bytes', - # 'Total memory.', - # namespace='worker') - def get(self): - # Example metric update - # self.workers.set(0.) + self._init() + + def _init(self): + if PrometheusHandler._initialized: + return + + self.prometheus_client.REGISTRY.register( + _PrometheusCollector( + self.server, + self.prometheus_client, + ) + ) + PrometheusHandler._initialized = True + + def get(self): self.write(self.prometheus_client.generate_latest()) + self.set_header('Content-Type', 'text/plain; version=0.0.4') routes = [