Skip to content

Commit

Permalink
Fix: rewrite init method, override as_publish_kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Jul 22, 2024
1 parent 7138709 commit 17d31b8
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 20 deletions.
24 changes: 22 additions & 2 deletions faststream/confluent/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
Expand All @@ -13,7 +15,25 @@ def __init__(
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None: ...
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key


@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
23 changes: 21 additions & 2 deletions faststream/kafka/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
Expand All @@ -13,7 +15,24 @@ def __init__(
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
key: Optional[bytes] = None,
) -> None: ...
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.timestamp_ms = timestamp_ms
self.key = key

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
"timestamp_ms": self.timestamp_ms,
"key": self.key,
}
return publish_options
21 changes: 16 additions & 5 deletions faststream/nats/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
Expand All @@ -13,8 +15,17 @@ def __init__(
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
timeout: Optional[float] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> None: ...
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
}
return publish_options
57 changes: 50 additions & 7 deletions faststream/rabbit/response.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,65 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
from faststream.rabbit import RabbitQueue
from aio_pika.abc import DateType, TimeoutType

from faststream.rabbit.types import AioPikaSendableMessage
from faststream.types import AnyDict


class RabbitResponse(Response):
def __init__(
self,
body: "AioPikaSendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
queue: Optional["RabbitQueue"] = None,
routing_key: str = "",
message_id: Optional[str] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> None: ...
mandatory: Optional[bool] = None,
immediate: Optional[bool] = None,
timeout: Optional["TimeoutType"] = None,
persist: Optional[bool] = None,
priority: Optional[int] = None,
message_type: Optional[str] = None,
content_type: Optional[str] = None,
expiration: Optional["DateType"] = None,
content_encoding: Optional[str] = None
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

self.message_id = message_id
self.mandatory = mandatory
self.immediate = immediate
self.timeout = timeout
self.persist = persist
self.priority = priority
self.message_type = message_type
self.content_type = content_type
self.expiration = expiration
self.content_encoding = content_encoding

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
"message_id": self.message_id,
"mandatory": self.mandatory,
"immediate": self.immediate,
"timeout": self.timeout,
"persist": self.persist,
"priority": self.priority,
"message_type": self.message_type,
"content_type": self.content_type,
"expiration": self.expiration,
"content_encoding": self.content_encoding
}
return publish_options
20 changes: 16 additions & 4 deletions faststream/redis/response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import TYPE_CHECKING, Optional

from typing_extensions import override

from faststream.broker.response import Response

if TYPE_CHECKING:
Expand All @@ -13,7 +15,17 @@ def __init__(
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> None: ...
) -> None:
super().__init__(
body=body,
headers=headers,
correlation_id=correlation_id,
)

@override
def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
}
return publish_options

0 comments on commit 17d31b8

Please sign in to comment.