Skip to content

Commit

Permalink
refactor: optimize confluent a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 5, 2024
1 parent 2a7a875 commit b49ed38
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .devcontainer/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
version: '3'

services:
python-3.12-faststream-studio:
python-3.12-faststream-studio: # nosemgrep
image: mcr.microsoft.com/devcontainers/python:3.12
container_name: python-3.12-faststream-studio
volumes:
- ../:/workspaces/faststream:cached
command: sleep infinity
network_mode: "host"

kafka-faststream:
kafka-faststream: # nosemgrep
image: bitnami/kafka:3.5.0
container_name: kafka-faststream
ports:
Expand Down
1 change: 0 additions & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ def __init__(
metadata_max_age_ms=metadata_max_age_ms,
allow_auto_create_topics=allow_auto_create_topics,
connections_max_idle_ms=connections_max_idle_ms,
loop=loop,
# publisher args
acks=acks,
compression_type=compression_type,
Expand Down
79 changes: 28 additions & 51 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from pydantic import BaseModel
from typing_extensions import Annotated, Doc

from faststream.confluent.config import ConfluentConfig
from faststream.log import logger
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from faststream.types import LoggerProto
from faststream.types import AnyDict, LoggerProto

_missing = object()

Expand All @@ -40,28 +39,12 @@
)


class MsgToSend(BaseModel):
"""A Pydantic model representing a message to be sent to Kafka.
Attributes:
timestamp (int): The timestamp of the message.
key (Optional[Union[str, bytes]]): The key of the message, can be a string or bytes.
value (Optional[Union[str, bytes]]): The value of the message, can be a string or bytes.
headers (List[Tuple[str, bytes]]): A list of headers associated with the message.
"""

timestamp: int
key: Optional[Union[str, bytes]]
value: Optional[Union[str, bytes]]
headers: List[Tuple[str, bytes]]


class BatchBuilder:
"""A helper class to build a batch of messages to send to Kafka."""

def __init__(self) -> None:
"""Initializes a new BatchBuilder instance."""
self._builder: List[MsgToSend] = []
self._builder: List[AnyDict] = []

def append(
self,
Expand All @@ -72,20 +55,17 @@ def append(
headers: Optional[List[Tuple[str, bytes]]] = None,
) -> None:
"""Appends a message to the batch with optional timestamp, key, value, and headers."""
if timestamp is None:
timestamp = round(time() * 1000)

if key is None and value is None:
raise KafkaException(
KafkaError(40, reason="Both key and value can't be None")
)

if headers is None:
headers = []

self._builder.append(
MsgToSend(timestamp=timestamp, key=key, value=value, headers=headers)
)
self._builder.append({
"timestamp_ms": timestamp or round(time() * 1000),
"key": key,
"value": value,
"headers": headers or [],
})


class AsyncConfluentProducer:
Expand All @@ -94,7 +74,6 @@ class AsyncConfluentProducer:
def __init__(
self,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
bootstrap_servers: Union[str, List[str]] = "localhost",
client_id: Optional[str] = None,
metadata_max_age_ms: int = 300000,
Expand Down Expand Up @@ -166,13 +145,10 @@ def __init__(
)

self.producer = Producer(self.config, logger=self.logger)
# self.producer.init_transactions()
self.loop = loop or asyncio.get_event_loop()
self.loop.run_in_executor(None, self.producer.list_topics)

async def stop(self) -> None:
"""Stop the Kafka producer and flush remaining messages."""
self.producer.flush()
await call_or_await(self.producer.flush)

async def send(
self,
Expand All @@ -196,11 +172,12 @@ async def send(
}
if timestamp_ms is not None:
kwargs["timestamp"] = timestamp_ms
self.producer.produce(
await call_or_await(
self.producer.produce,
topic,
**kwargs,
)
self.producer.poll(0)
await call_or_await(self.producer.poll, 0)

def create_batch(self) -> BatchBuilder:
"""Creates a batch for sending multiple messages.
Expand All @@ -218,10 +195,7 @@ async def send_batch(
self.send(
topic=topic,
partition=partition,
timestamp_ms=msg.timestamp,
key=msg.key,
value=msg.value,
headers=msg.headers, # type: ignore[arg-type]
**msg,
)
for msg in batch._builder
]
Expand Down Expand Up @@ -265,17 +239,21 @@ def create_topics(
)

fs = admin_client.create_topics(
[NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics]
[NewTopic(topic, num_partitions=1, replication_factor=1)
for topic in topics]
)

for topic, f in fs.items():
try:
f.result() # The result itself is None
except Exception as e: # noqa: PERF203
if "TOPIC_ALREADY_EXISTS" not in str(e):
logger.warning(f"Failed to create topic {topic}: {e}") # type: ignore[union-attr]
logger.warning( # type: ignore[union-attr]
f"Failed to create topic {topic}: {e}"
)
else:
logger.info(f"Topic `{topic}` created.") # type: ignore[union-attr]
# type: ignore[union-attr]
logger.info(f"Topic `{topic}` created.")


class AsyncConfluentConsumer:
Expand All @@ -284,7 +262,6 @@ class AsyncConfluentConsumer:
def __init__(
self,
*topics: str,
loop: Optional[asyncio.AbstractEventLoop] = None,
bootstrap_servers: Union[str, List[str]] = "localhost",
client_id: Optional[str] = "confluent-kafka-consumer",
group_id: Optional[str] = None,
Expand Down Expand Up @@ -363,6 +340,7 @@ def __init__(
"connections.max.idle.ms": connections_max_idle_ms,
"isolation.level": isolation_level,
}
self.allow_auto_create_topics = allow_auto_create_topics
self.config = {**self.config, **config_from_params}

if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
Expand All @@ -374,21 +352,20 @@ def __init__(
}
)

self.loop = loop or asyncio.get_event_loop()
self.consumer = Consumer(self.config, logger=self.logger)

if allow_auto_create_topics:
self.loop.run_in_executor(
None, create_topics, self.topics, self.config, logger
async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
if self.allow_auto_create_topics:
await call_or_await(
create_topics, self.topics, self.config, logger
)
else:
logger.warning( # type: ignore[union-attr]
"Auto create topics is disabled. Make sure the topics exist."
)
self.consumer = Consumer(self.config, logger=self.logger)

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
self.consumer.subscribe(self.topics)
await call_or_await(self.consumer.subscribe, self.topics)

async def commit(self, asynchronous: bool = True) -> None:
"""Commits the offsets of all messages returned by the last poll operation."""
Expand Down

0 comments on commit b49ed38

Please sign in to comment.