From 2d123398683766a29adaf598d7ffa3900998066a Mon Sep 17 00:00:00 2001 From: Roma Frolov Date: Wed, 18 Sep 2024 00:10:14 +0300 Subject: [PATCH] edit message count in process & fix settings provider for Nats KV and Nats OS --- faststream/prometheus/middleware.py | 10 ++++++++-- tests/prometheus/basic.py | 8 +++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/faststream/prometheus/middleware.py b/faststream/prometheus/middleware.py index 06d6b79005..cd7148ac40 100644 --- a/faststream/prometheus/middleware.py +++ b/faststream/prometheus/middleware.py @@ -97,6 +97,9 @@ async def consume_scope( call_next: "AsyncFuncAny", msg: "StreamMessage[Any]", ) -> Any: + if self._settings_provider is None: + return await call_next(msg) + messaging_system = self._settings_provider.messaging_system consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg) destination_name = consume_attrs["destination_name"] @@ -116,7 +119,7 @@ async def consume_scope( self._metrics.received_messages_in_process.labels( broker=messaging_system, handler=destination_name, - ).inc() + ).inc(consume_attrs["messages_count"]) start_time = time.perf_counter() @@ -137,7 +140,7 @@ async def consume_scope( self._metrics.received_messages_in_process.labels( broker=messaging_system, handler=destination_name, - ).dec() + ).dec(consume_attrs["messages_count"]) status = ProcessingStatus.acked @@ -170,6 +173,9 @@ async def publish_scope( *args: Any, **kwargs: Any, ) -> Any: + if self._settings_provider is None: + return await call_next(msg, *args, **kwargs) + err: Optional[Exception] = None start_time = time.perf_counter() diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py index ef58eb5201..0232829638 100644 --- a/tests/prometheus/basic.py +++ b/tests/prometheus/basic.py @@ -116,12 +116,12 @@ def assert_consume_metrics( broker=settings_provider.messaging_system, handler=consume_attrs["destination_name"], ), - call().inc(), + call().inc(consume_attrs["messages_count"]), call( broker=settings_provider.messaging_system, handler=consume_attrs["destination_name"], ), - call().dec(), + call().dec(consume_attrs["messages_count"]), ] assert metrics.received_messages_processing_time.labels.mock_calls == [ @@ -132,12 +132,14 @@ def assert_consume_metrics( call().observe(ANY), ] + status = ProcessingStatus.acked + if exception_class: status = ( PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class) or ProcessingStatus.error ) - else: + elif message.committed: status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed] assert metrics.received_processed_messages.labels.mock_calls == [