Skip to content

Commit

Permalink
Removed Doc and Added docstrings instead (#1662)
Browse files Browse the repository at this point in the history
* removed Doc and added docstrings instead

* fixed formating
  • Loading branch information
Kirill-Stepankov authored Aug 11, 2024
1 parent 7fda6bf commit 7fefa25
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 139 deletions.
95 changes: 35 additions & 60 deletions faststream/nats/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union

from nats.aio.msg import Msg
from typing_extensions import Annotated, Doc, override
from typing_extensions import override

from faststream.broker.message import gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
Expand Down Expand Up @@ -61,68 +61,43 @@ def __hash__(self) -> int:
@override
async def publish(
self,
message: Annotated[
"SendableMessage",
Doc(
"Message body to send. "
"Can be any encodable object (native python types or `pydantic.BaseModel`)."
),
],
subject: Annotated[
str,
Doc("NATS subject to send message."),
] = "",
message: "SendableMessage",
subject: str = "",
*,
headers: Annotated[
Optional[Dict[str, str]],
Doc(
"Message headers to store metainformation. "
"**content-type** and **correlation_id** will be set automatically by framework anyway."
),
] = None,
reply_to: Annotated[
str,
Doc("NATS subject name to send response."),
] = "",
correlation_id: Annotated[
Optional[str],
Doc(
"Manual message **correlation_id** setter. "
"**correlation_id** is a useful option to trace messages."
),
] = None,
stream: Annotated[
Optional[str],
Doc(
"This option validates that the target subject is in presented stream. "
"Can be omitted without any effect."
),
] = None,
timeout: Annotated[
Optional[float],
Doc("Timeout to send message to NATS."),
] = None,
rpc: Annotated[
bool,
Doc("Whether to wait for reply in blocking mode."),
] = False,
rpc_timeout: Annotated[
Optional[float],
Doc("RPC reply waiting time."),
] = 30.0,
raise_timeout: Annotated[
bool,
Doc(
"Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
"RPC request returns `None` at timeout by default."
),
] = False,
headers: Optional[Dict[str, str]] = None,
reply_to: str = "",
correlation_id: Optional[str] = None,
stream: Optional[str] = None,
timeout: Optional[float] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
# publisher specific
_extra_middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Extra middlewares to wrap publishing process."),
] = (),
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
"""Publish message directly.
Args:
message (SendableMessage): Message body to send.
Can be any encodable object (native python types or `pydantic.BaseModel`).
subject (str): NATS subject to send message (default is `''`).
headers (:obj:`dict` of :obj:`str`: :obj:`str`, optional): Message headers to store metainformation (default is `None`).
**content-type** and **correlation_id** will be set automatically by framework anyway.
reply_to (str): NATS subject name to send response (default is `None`).
correlation_id (str, optional): Manual message **correlation_id** setter (default is `None`).
**correlation_id** is a useful option to trace messages.
stream (str, optional): This option validates that the target subject is in presented stream (default is `None`).
Can be omitted without any effect.
timeout (float, optional): Timeout to send message to NATS in seconds (default is `None`).
rpc (bool): Whether to wait for reply in blocking mode (default is `False`).
rpc_timeout (float, optional): RPC reply waiting time (default is `30.0`).
raise_timeout (bool): Whetever to raise `TimeoutError` or return `None` at **rpc_timeout** (default is `False`).
RPC request returns `None` at timeout by default.
_extra_middlewares (:obj:`Iterable` of :obj:`PublisherMiddleware`): Extra middlewares to wrap publishing process (default is `()`).
"""
assert self._producer, NOT_CONNECTED_YET # nosec B101

kwargs: AnyDict = {
Expand Down
55 changes: 20 additions & 35 deletions faststream/nats/schemas/kv_watch.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
from typing import Optional

from typing_extensions import Annotated, Doc

from faststream.broker.schemas import NameRequired


class KvWatch(NameRequired):
"""A class to represent a NATS kv watch subscription."""
"""A class to represent a NATS kv watch subscription.
Args:
bucket (str): Bucket name.
headers_only (bool): Whether to receive only headers (default is `False`).
include_history (bool): Whether to include history (default is `False`).
ignore_deletes (bool): Whether to ignore deletes (default is `False`).
meta_only (bool): Whether to receive only metadata (default is `False`).
inactive_threshold (:obj:`float`, optional): Inactivity threshold (default is `None`).
timeout (:obj:`float`, optional): Timeout in seconds (default is `5.0`).
declare (bool): Whether to create bucket automatically or just connect to it (default is `True`).
"""

__slots__ = (
"bucket",
Expand All @@ -21,39 +30,15 @@ class KvWatch(NameRequired):

def __init__(
self,
bucket: Annotated[
str,
Doc("Bucket name."),
],
headers_only: Annotated[
bool,
Doc("Whether to receive only headers."),
] = False,
include_history: Annotated[
bool,
Doc("Whether to include history."),
] = False,
ignore_deletes: Annotated[
bool,
Doc("Whether to ignore deletes."),
] = False,
meta_only: Annotated[
bool,
Doc("Whether to receive only metadata."),
] = False,
inactive_threshold: Annotated[
Optional[float],
Doc("Inactivity threshold."),
] = None,
timeout: Annotated[
Optional[float],
Doc("Timeout in seconds."),
] = 5.0,
bucket: str,
headers_only: bool = False,
include_history: bool = False,
ignore_deletes: bool = False,
meta_only: bool = False,
inactive_threshold: Optional[float] = None,
timeout: Optional[float] = 5.0,
# custom
declare: Annotated[
bool,
Doc("Whether to create bucket automatically or just connect to it."),
] = True,
declare: bool = True,
) -> None:
super().__init__(bucket)

Expand Down
39 changes: 14 additions & 25 deletions faststream/nats/schemas/obj_watch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from typing import Literal, Optional, Union, overload

from typing_extensions import Annotated, Doc


class ObjWatch:
"""A class to represent a NATS object storage watch subscription."""
"""A class to represent a NATS object storage watch subscription.
Args:
ignore_deletes (bool): Ignore delete events (default is `False`).
include_history (bool): Include history (default is `False`).
meta_only (bool): Only metadata. (default is `False`).
timeout (float): The timeout for the watch in seconds (default is `5.0`).
declare (bool): Whether to create object storage automatically or just connect to it (default is `True`).
"""

__slots__ = (
"ignore_deletes",
Expand All @@ -16,29 +22,12 @@ class ObjWatch:

def __init__(
self,
ignore_deletes: Annotated[
bool,
Doc("Ignore delete events."),
] = False,
include_history: Annotated[
bool,
Doc("Include history."),
] = False,
meta_only: Annotated[
bool,
Doc("Only metadata."),
] = False,
timeout: Annotated[
float,
Doc("The timeout for the watch."),
] = 5.0,
ignore_deletes: bool = False,
include_history: bool = False,
meta_only: bool = False,
timeout: float = 5.0,
# custom
declare: Annotated[
bool,
Doc(
"Whether to create object storage automatically or just connect to it."
),
] = True,
declare: bool = True,
) -> None:
self.ignore_deletes = ignore_deletes
self.include_history = include_history
Expand Down
30 changes: 11 additions & 19 deletions faststream/nats/schemas/pull_sub.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Literal, Optional, Union, overload

from typing_extensions import Annotated, Doc


class PullSub:
"""A class to represent a NATS pull subscription."""
"""A class to represent a NATS pull subscription.
Args:
batch_size (int): Consuming messages batch size. (default is `1`).
timeout (:obj:`float`, optional): Wait this time for required batch size will be accumulated in stream
in seconds (default is `5.0`).
batch (bool): Whether to propagate consuming batch as iterable object to your handler (default is `False`).
"""

__slots__ = (
"batch",
Expand All @@ -14,22 +19,9 @@ class PullSub:

def __init__(
self,
batch_size: Annotated[
int,
Doc("Consuming messages batch size."),
] = 1,
timeout: Annotated[
Optional[float],
Doc(
"Wait this time for required batch size will be accumulated in stream."
),
] = 5.0,
batch: Annotated[
bool,
Doc(
"Whether to propagate consuming batch as iterable object to your handler."
),
] = False,
batch_size: int = 1,
timeout: Optional[float] = 5.0,
batch: bool = False,
) -> None:
self.batch_size = batch_size
self.batch = batch
Expand Down

0 comments on commit 7fefa25

Please sign in to comment.