Skip to content

Commit

Permalink
str statuses -> StrEnums
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-frolov committed Sep 16, 2024
1 parent 9823e4b commit 09be4b4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
36 changes: 22 additions & 14 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from prometheus_client import Counter, Gauge, Histogram

from faststream import BaseMiddleware
from faststream.broker.message import AckStatus
from faststream.exceptions import (
AckMessage,
NackMessage,
RejectMessage,
SkipMessage,
)
from faststream.prometheus.provider import MetricsSettingsProvider
from faststream.prometheus.types import ProcessingStatus, PublishingStatus

if TYPE_CHECKING: # pragma: no cover
from prometheus_client import CollectorRegistry
Expand Down Expand Up @@ -137,23 +139,22 @@ async def consume_scope(
handler=destination_name,
).dec()

status = "acked"
status = ProcessingStatus.acked

if msg.committed or err:
status = (
msg.committed.value
if msg.committed
else PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
or "error"
PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed)
or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
or ProcessingStatus.error
)

self._metrics.received_processed_messages.labels(
broker=messaging_system,
handler=destination_name,
status=status,
status=status.value,
).inc()

if status == "error":
if status == ProcessingStatus.error:
self._metrics.messages_processing_exceptions.labels(
broker=messaging_system,
handler=destination_name,
Expand Down Expand Up @@ -195,16 +196,16 @@ async def publish_scope(
destination=destination_name,
).observe(duration)

status = "error" if err else "success"
status = PublishingStatus.error if err else PublishingStatus.success
messages_count = len((msg, *args))

self._metrics.published_messages.labels(
broker=messaging_system,
destination=destination_name,
status=status,
status=status.value,
).inc(messages_count)

if status == "error":
if status == PublishingStatus.error:
self._metrics.messages_publishing_exceptions.labels(
broker=messaging_system,
destination=destination_name,
Expand All @@ -215,10 +216,17 @@ async def publish_scope(


PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP = {
AckMessage: "acked",
NackMessage: "nacked",
RejectMessage: "rejected",
SkipMessage: "skipped",
AckMessage: ProcessingStatus.acked,
NackMessage: ProcessingStatus.nacked,
RejectMessage: ProcessingStatus.rejected,
SkipMessage: ProcessingStatus.skipped,
}


PROCESSING_STATUS_BY_ACK_STATUS = {
AckStatus.acked: ProcessingStatus.acked,
AckStatus.nacked: ProcessingStatus.nacked,
AckStatus.rejected: ProcessingStatus.rejected,
}


Expand Down
14 changes: 14 additions & 0 deletions faststream/prometheus/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from enum import StrEnum


class ProcessingStatus(StrEnum):
acked = "acked"
nacked = "nacked"
rejected = "rejected"
skipped = "skipped"
error = "error"


class PublishingStatus(StrEnum):
success = "success"
error = "error"
10 changes: 6 additions & 4 deletions tests/prometheus/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from faststream.broker.core.usecase import BrokerUsecase
from faststream.broker.message import AckStatus, StreamMessage
from faststream.prometheus.middleware import (
PROCESSING_STATUS_BY_ACK_STATUS,
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
BasePrometheusMiddleware,
)
from faststream.prometheus.types import ProcessingStatus
from tests.brokers.base.basic import BaseTestcaseConfig


Expand Down Expand Up @@ -133,21 +135,21 @@ def assert_consume_metrics(
if exception_class:
status = (
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class)
or "error"
or ProcessingStatus.error
)
else:
status = message.committed.value
status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed]

assert metrics.received_processed_messages.labels.mock_calls == [
call(
broker=settings_provider.messaging_system,
handler=consume_attrs["destination_name"],
status=status,
status=status.value,
),
call().inc(),
]

if status == "error":
if status == ProcessingStatus.error:
assert metrics.messages_processing_exceptions.labels.mock_calls == [
call(
broker=settings_provider.messaging_system,
Expand Down

0 comments on commit 09be4b4

Please sign in to comment.