From 29a792a9911e1856cdf4ea19ed7e088422e963ce Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 13 Jul 2023 19:54:55 -0700 Subject: [PATCH 1/3] Register prometheus metrics with multiprocess collector --- taskiq/middlewares/prometheus_middleware.py | 22 +++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/taskiq/middlewares/prometheus_middleware.py b/taskiq/middlewares/prometheus_middleware.py index c3968cf..3de0e89 100644 --- a/taskiq/middlewares/prometheus_middleware.py +++ b/taskiq/middlewares/prometheus_middleware.py @@ -42,36 +42,50 @@ def __init__( logger.debug("Initializing metrics") try: - from prometheus_client import Counter, Histogram # noqa: WPS433 + from prometheus_client import ( # noqa: WPS433 + CollectorRegistry, + Counter, + Histogram, + multiprocess, + ) except ImportError as exc: raise ImportError( "Cannot initialize metrics. Please install 'taskiq[metrics]'.", ) from exc + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + self.registry = registry + self.found_errors = Counter( "found_errors", "Number of found errors", ["task_name"], + registry=registry, ) self.received_tasks = Counter( "received_tasks", "Number of received tasks", ["task_name"], + registry=registry, ) self.success_tasks = Counter( "success_tasks", "Number of successfully executed tasks", ["task_name"], + registry=registry, ) self.saved_results = Counter( "saved_results", "Number of saved results in result backend", ["task_name"], + registry=registry, ) self.execution_time = Histogram( "execution_time", "Tome of function execution", ["task_name"], + registry=registry, ) self.server_port = server_port self.server_addr = server_addr @@ -87,7 +101,11 @@ def startup(self) -> None: if self.broker.is_worker_process: try: - start_http_server(port=self.server_port, addr=self.server_addr) + start_http_server( + port=self.server_port, + addr=self.server_addr, + registry=self.registry, + ) except OSError as exc: logger.debug("Cannot start prometheus server: %s", exc) From ef642285276b829de1826cc37be5f53350cefa55 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Thu, 13 Jul 2023 20:30:38 -0700 Subject: [PATCH 2/3] Fix typo --- taskiq/middlewares/prometheus_middleware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskiq/middlewares/prometheus_middleware.py b/taskiq/middlewares/prometheus_middleware.py index 3de0e89..2d8f797 100644 --- a/taskiq/middlewares/prometheus_middleware.py +++ b/taskiq/middlewares/prometheus_middleware.py @@ -83,7 +83,7 @@ def __init__( ) self.execution_time = Histogram( "execution_time", - "Tome of function execution", + "Time of function execution", ["task_name"], registry=registry, ) From d83fdfeb9368cc783b11ba286dd2cfec7658f6c0 Mon Sep 17 00:00:00 2001 From: Nick Dichev Date: Tue, 18 Jul 2023 16:58:10 -0700 Subject: [PATCH 3/3] Add ability to pass metrics registry, falling back to default REGISTRY --- taskiq/middlewares/prometheus_middleware.py | 42 ++++++++++----------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/taskiq/middlewares/prometheus_middleware.py b/taskiq/middlewares/prometheus_middleware.py index 2d8f797..bbc78f5 100644 --- a/taskiq/middlewares/prometheus_middleware.py +++ b/taskiq/middlewares/prometheus_middleware.py @@ -10,6 +10,18 @@ logger = getLogger("taskiq.prometheus") +try: + from prometheus_client import ( # noqa: WPS433 + REGISTRY, + CollectorRegistry, + Counter, + Histogram, + ) +except ImportError as imp_exc: + raise ImportError( + "Cannot initialize metrics. Please install 'taskiq[metrics]'.", + ) from imp_exc + class PrometheusMiddleware(TaskiqMiddleware): """ @@ -24,6 +36,7 @@ class PrometheusMiddleware(TaskiqMiddleware): def __init__( self, metrics_path: Optional[Path] = None, + metrics_registry: CollectorRegistry = REGISTRY, server_port: int = 9000, server_addr: str = "0.0.0.0", # noqa: S104 ) -> None: @@ -41,52 +54,37 @@ def __init__( logger.debug("Initializing metrics") - try: - from prometheus_client import ( # noqa: WPS433 - CollectorRegistry, - Counter, - Histogram, - multiprocess, - ) - except ImportError as exc: - raise ImportError( - "Cannot initialize metrics. Please install 'taskiq[metrics]'.", - ) from exc - - registry = CollectorRegistry() - multiprocess.MultiProcessCollector(registry) - self.registry = registry - self.found_errors = Counter( "found_errors", "Number of found errors", ["task_name"], - registry=registry, + registry=metrics_registry, ) self.received_tasks = Counter( "received_tasks", "Number of received tasks", ["task_name"], - registry=registry, + registry=metrics_registry, ) self.success_tasks = Counter( "success_tasks", "Number of successfully executed tasks", ["task_name"], - registry=registry, + registry=metrics_registry, ) self.saved_results = Counter( "saved_results", "Number of saved results in result backend", ["task_name"], - registry=registry, + registry=metrics_registry, ) self.execution_time = Histogram( "execution_time", "Time of function execution", ["task_name"], - registry=registry, + registry=metrics_registry, ) + self.metrics_registry = metrics_registry self.server_port = server_port self.server_addr = server_addr @@ -104,7 +102,7 @@ def startup(self) -> None: start_http_server( port=self.server_port, addr=self.server_addr, - registry=self.registry, + registry=self.metrics_registry, ) except OSError as exc: logger.debug("Cannot start prometheus server: %s", exc)