Skip to content

Commit

Permalink
confluent prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-frolov committed Sep 16, 2024
1 parent 554e31c commit d77f8ab
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 0 deletions.
Empty file.
19 changes: 19 additions & 0 deletions faststream/confluent/prometheus/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING

from faststream.confluent.prometheus.provider import settings_provider_factory
from faststream.prometheus.middleware import BasePrometheusMiddleware

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry


class KafkaPrometheusMiddleware(BasePrometheusMiddleware):
def __init__(
self,
*,
registry: "CollectorRegistry",
):
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
)
62 changes: 62 additions & 0 deletions faststream/confluent/prometheus/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import TYPE_CHECKING, Sequence, Tuple, Union

from faststream.broker.message import MsgType, StreamMessage
from faststream.prometheus.provider import (
ConsumeAttrs,
MetricsSettingsProvider,
)

if TYPE_CHECKING:
from confluent_kafka import Message

from faststream.types import AnyDict


class BaseConfluentMetricsSettingsProvider(MetricsSettingsProvider[MsgType]):
def __init__(self):
self.messaging_system = "kafka"

def get_publish_destination_name_from_kwargs(
self,
kwargs: "AnyDict",
) -> str:
return kwargs["topic"]


class ConfluentMetricsSettingsProvider(BaseConfluentMetricsSettingsProvider["Message"]):
def get_consume_attrs_from_message(
self,
msg: "StreamMessage[Message]",
) -> ConsumeAttrs:
return {
"destination_name": msg.raw_message.topic(),
"message_size": len(msg.body),
"messages_count": 1,
}


class BatchConfluentMetricsSettingsProvider(
BaseConfluentMetricsSettingsProvider[Tuple["Message", ...]]
):
def get_consume_attrs_from_message(
self,
msg: "StreamMessage[Tuple[Message, ...]]",
) -> ConsumeAttrs:
raw_message = msg.raw_message[0]
return {
"destination_name": raw_message.topic(),
"message_size": len(bytearray().join(msg.body)),
"messages_count": len(msg.raw_message),
}


def settings_provider_factory(
msg: Union["Message", Sequence["Message"], None],
) -> Union[
ConfluentMetricsSettingsProvider,
BatchConfluentMetricsSettingsProvider,
]:
if isinstance(msg, Sequence):
return BatchConfluentMetricsSettingsProvider()
else:
return ConfluentMetricsSettingsProvider()
3 changes: 3 additions & 0 deletions tests/prometheus/confluent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("aio_pika")
75 changes: 75 additions & 0 deletions tests/prometheus/confluent/test_confluent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import asyncio
from unittest.mock import Mock

import pytest
from prometheus_client import CollectorRegistry

from faststream.confluent import KafkaBroker, KafkaMessage
from faststream.confluent.prometheus.middleware import KafkaPrometheusMiddleware
from tests.brokers.confluent.test_consume import TestConsume
from tests.brokers.confluent.test_publish import TestPublish
from tests.prometheus.basic import LocalPrometheusTestcase


@pytest.mark.kafka
class TestPrometheus(LocalPrometheusTestcase):
broker_class = KafkaBroker
middleware_class = KafkaPrometheusMiddleware
message_class = KafkaMessage

async def test_metrics_batch(
self,
event: asyncio.Event,
queue: str,
):
middleware = self.middleware_class(registry=CollectorRegistry())
metrics_mock = Mock()
middleware._metrics = metrics_mock

broker = self.broker_class(middlewares=(middleware,))

args, kwargs = self.get_subscriber_params(queue, batch=True)

message_class = self.message_class
message = None

@broker.subscriber(*args, **kwargs)
async def handler(m: message_class):
event.set()

nonlocal message
message = m

async with broker:
await broker.start()
tasks = (
asyncio.create_task(
broker.publish_batch("hello", "world", topic=queue)
),
asyncio.create_task(event.wait()),
)
await asyncio.wait(tasks, timeout=self.timeout)

assert event.is_set()
self.assert_consume_metrics(
metrics=metrics_mock, message=message, exception_class=None
)
self.assert_publish_metrics(metrics=metrics_mock)


@pytest.mark.kafka
class TestPublishWithPrometheus(TestPublish):
def get_broker(self, apply_types: bool = False):
return KafkaBroker(
middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
apply_types=apply_types,
)


@pytest.mark.kafka
class TestConsumeWithTelemetry(TestConsume):
def get_broker(self, apply_types: bool = False):
return KafkaBroker(
middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
apply_types=apply_types,
)

0 comments on commit d77f8ab

Please sign in to comment.