diff --git a/faststream/nats/publisher/usecase.py b/faststream/nats/publisher/usecase.py index 6f52bd2d96..291eb94ac2 100644 --- a/faststream/nats/publisher/usecase.py +++ b/faststream/nats/publisher/usecase.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union from nats.aio.msg import Msg -from typing_extensions import Annotated, Doc, override +from typing_extensions import override from faststream.broker.message import gen_cor_id from faststream.broker.publisher.usecase import PublisherUsecase @@ -61,68 +61,43 @@ def __hash__(self) -> int: @override async def publish( self, - message: Annotated[ - "SendableMessage", - Doc( - "Message body to send. " - "Can be any encodable object (native python types or `pydantic.BaseModel`)." - ), - ], - subject: Annotated[ - str, - Doc("NATS subject to send message."), - ] = "", + message: "SendableMessage", + subject: str = "", *, - headers: Annotated[ - Optional[Dict[str, str]], - Doc( - "Message headers to store metainformation. " - "**content-type** and **correlation_id** will be set automatically by framework anyway." - ), - ] = None, - reply_to: Annotated[ - str, - Doc("NATS subject name to send response."), - ] = "", - correlation_id: Annotated[ - Optional[str], - Doc( - "Manual message **correlation_id** setter. " - "**correlation_id** is a useful option to trace messages." - ), - ] = None, - stream: Annotated[ - Optional[str], - Doc( - "This option validates that the target subject is in presented stream. " - "Can be omitted without any effect." - ), - ] = None, - timeout: Annotated[ - Optional[float], - Doc("Timeout to send message to NATS."), - ] = None, - rpc: Annotated[ - bool, - Doc("Whether to wait for reply in blocking mode."), - ] = False, - rpc_timeout: Annotated[ - Optional[float], - Doc("RPC reply waiting time."), - ] = 30.0, - raise_timeout: Annotated[ - bool, - Doc( - "Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. " - "RPC request returns `None` at timeout by default." - ), - ] = False, + headers: Optional[Dict[str, str]] = None, + reply_to: str = "", + correlation_id: Optional[str] = None, + stream: Optional[str] = None, + timeout: Optional[float] = None, + rpc: bool = False, + rpc_timeout: Optional[float] = 30.0, + raise_timeout: bool = False, # publisher specific - _extra_middlewares: Annotated[ - Iterable["PublisherMiddleware"], - Doc("Extra middlewares to wrap publishing process."), - ] = (), + _extra_middlewares: Iterable["PublisherMiddleware"] = (), ) -> Optional[Any]: + """Publish message directly. + + Args: + message (SendableMessage): Message body to send. + Can be any encodable object (native python types or `pydantic.BaseModel`). + subject (str): NATS subject to send message (default is `''`). + headers (:obj:`dict` of :obj:`str`: :obj:`str`, optional): Message headers to store metainformation (default is `None`). + **content-type** and **correlation_id** will be set automatically by framework anyway. + + reply_to (str): NATS subject name to send response (default is `None`). + correlation_id (str, optional): Manual message **correlation_id** setter (default is `None`). + **correlation_id** is a useful option to trace messages. + + stream (str, optional): This option validates that the target subject is in presented stream (default is `None`). + Can be omitted without any effect. + timeout (float, optional): Timeout to send message to NATS in seconds (default is `None`). + rpc (bool): Whether to wait for reply in blocking mode (default is `False`). + rpc_timeout (float, optional): RPC reply waiting time (default is `30.0`). + raise_timeout (bool): Whetever to raise `TimeoutError` or return `None` at **rpc_timeout** (default is `False`). + RPC request returns `None` at timeout by default. + + _extra_middlewares (:obj:`Iterable` of :obj:`PublisherMiddleware`): Extra middlewares to wrap publishing process (default is `()`). + """ assert self._producer, NOT_CONNECTED_YET # nosec B101 kwargs: AnyDict = { diff --git a/faststream/nats/schemas/kv_watch.py b/faststream/nats/schemas/kv_watch.py index a1f50fce96..d756f8d28d 100644 --- a/faststream/nats/schemas/kv_watch.py +++ b/faststream/nats/schemas/kv_watch.py @@ -1,12 +1,21 @@ from typing import Optional -from typing_extensions import Annotated, Doc - from faststream.broker.schemas import NameRequired class KvWatch(NameRequired): - """A class to represent a NATS kv watch subscription.""" + """A class to represent a NATS kv watch subscription. + + Args: + bucket (str): Bucket name. + headers_only (bool): Whether to receive only headers (default is `False`). + include_history (bool): Whether to include history (default is `False`). + ignore_deletes (bool): Whether to ignore deletes (default is `False`). + meta_only (bool): Whether to receive only metadata (default is `False`). + inactive_threshold (:obj:`float`, optional): Inactivity threshold (default is `None`). + timeout (:obj:`float`, optional): Timeout in seconds (default is `5.0`). + declare (bool): Whether to create bucket automatically or just connect to it (default is `True`). + """ __slots__ = ( "bucket", @@ -21,39 +30,15 @@ class KvWatch(NameRequired): def __init__( self, - bucket: Annotated[ - str, - Doc("Bucket name."), - ], - headers_only: Annotated[ - bool, - Doc("Whether to receive only headers."), - ] = False, - include_history: Annotated[ - bool, - Doc("Whether to include history."), - ] = False, - ignore_deletes: Annotated[ - bool, - Doc("Whether to ignore deletes."), - ] = False, - meta_only: Annotated[ - bool, - Doc("Whether to receive only metadata."), - ] = False, - inactive_threshold: Annotated[ - Optional[float], - Doc("Inactivity threshold."), - ] = None, - timeout: Annotated[ - Optional[float], - Doc("Timeout in seconds."), - ] = 5.0, + bucket: str, + headers_only: bool = False, + include_history: bool = False, + ignore_deletes: bool = False, + meta_only: bool = False, + inactive_threshold: Optional[float] = None, + timeout: Optional[float] = 5.0, # custom - declare: Annotated[ - bool, - Doc("Whether to create bucket automatically or just connect to it."), - ] = True, + declare: bool = True, ) -> None: super().__init__(bucket) diff --git a/faststream/nats/schemas/obj_watch.py b/faststream/nats/schemas/obj_watch.py index 998bef9482..3a5648e7c8 100644 --- a/faststream/nats/schemas/obj_watch.py +++ b/faststream/nats/schemas/obj_watch.py @@ -1,10 +1,16 @@ from typing import Literal, Optional, Union, overload -from typing_extensions import Annotated, Doc - class ObjWatch: - """A class to represent a NATS object storage watch subscription.""" + """A class to represent a NATS object storage watch subscription. + + Args: + ignore_deletes (bool): Ignore delete events (default is `False`). + include_history (bool): Include history (default is `False`). + meta_only (bool): Only metadata. (default is `False`). + timeout (float): The timeout for the watch in seconds (default is `5.0`). + declare (bool): Whether to create object storage automatically or just connect to it (default is `True`). + """ __slots__ = ( "ignore_deletes", @@ -16,29 +22,12 @@ class ObjWatch: def __init__( self, - ignore_deletes: Annotated[ - bool, - Doc("Ignore delete events."), - ] = False, - include_history: Annotated[ - bool, - Doc("Include history."), - ] = False, - meta_only: Annotated[ - bool, - Doc("Only metadata."), - ] = False, - timeout: Annotated[ - float, - Doc("The timeout for the watch."), - ] = 5.0, + ignore_deletes: bool = False, + include_history: bool = False, + meta_only: bool = False, + timeout: float = 5.0, # custom - declare: Annotated[ - bool, - Doc( - "Whether to create object storage automatically or just connect to it." - ), - ] = True, + declare: bool = True, ) -> None: self.ignore_deletes = ignore_deletes self.include_history = include_history diff --git a/faststream/nats/schemas/pull_sub.py b/faststream/nats/schemas/pull_sub.py index 7544d17b74..b38b48ebdb 100644 --- a/faststream/nats/schemas/pull_sub.py +++ b/faststream/nats/schemas/pull_sub.py @@ -1,10 +1,15 @@ from typing import Literal, Optional, Union, overload -from typing_extensions import Annotated, Doc - class PullSub: - """A class to represent a NATS pull subscription.""" + """A class to represent a NATS pull subscription. + + Args: + batch_size (int): Consuming messages batch size. (default is `1`). + timeout (:obj:`float`, optional): Wait this time for required batch size will be accumulated in stream + in seconds (default is `5.0`). + batch (bool): Whether to propagate consuming batch as iterable object to your handler (default is `False`). + """ __slots__ = ( "batch", @@ -14,22 +19,9 @@ class PullSub: def __init__( self, - batch_size: Annotated[ - int, - Doc("Consuming messages batch size."), - ] = 1, - timeout: Annotated[ - Optional[float], - Doc( - "Wait this time for required batch size will be accumulated in stream." - ), - ] = 5.0, - batch: Annotated[ - bool, - Doc( - "Whether to propagate consuming batch as iterable object to your handler." - ), - ] = False, + batch_size: int = 1, + timeout: Optional[float] = 5.0, + batch: bool = False, ) -> None: self.batch_size = batch_size self.batch = batch