Skip to content

Commit

Permalink
tests: fix all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Sep 17, 2024
1 parent 5d4cac1 commit cc55a61
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 71 deletions.
6 changes: 4 additions & 2 deletions examples/e04_msg_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

subscriber = broker.subscriber("test-queue")

@broker.subscriber("test-queue", filter=lambda m: m.content_type == "application/json")

@subscriber(filter=lambda m: m.content_type == "application/json")
async def handle_json(msg, logger: Logger):
logger.info(f"JSON message: {msg}")


@broker.subscriber("test-queue")
@subscriber
async def handle_other_messages(msg, logger: Logger):
logger.info(f"Default message: {msg}")

Expand Down
8 changes: 4 additions & 4 deletions faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,15 @@ async def connect(self, **kwargs: Any) -> ConnectionType:
connection_kwargs = self._connection_kwargs.copy()
connection_kwargs.update(kwargs)
self._connection = await self._connect(**connection_kwargs)
self.setup()
self._setup()
return self._connection

@abstractmethod
async def _connect(self) -> ConnectionType:
"""Connect to a resource."""
raise NotImplementedError()

def setup(self) -> None:
def _setup(self) -> None:
"""Prepare all Broker entities to startup."""
for h in self._subscribers.values():
self.setup_subscriber(h)
Expand All @@ -246,7 +246,7 @@ def setup_subscriber(
"""Setup the Subscriber to prepare it to starting."""
data = self._subscriber_setup_extra.copy()
data.update(kwargs)
subscriber.setup(**data)
subscriber._setup(**data)

def setup_publisher(
self,
Expand All @@ -256,7 +256,7 @@ def setup_publisher(
"""Setup the Publisher to prepare it to starting."""
data = self._publisher_setup_extra.copy()
data.update(kwargs)
publisher.setup(**data)
publisher._setup(**data)

@property
def _subscriber_setup_extra(self) -> "AnyDict":
Expand Down
37 changes: 25 additions & 12 deletions faststream/_internal/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,19 @@ def _run(
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app."),
message: str = typer.Argument(..., help="Message to be published."),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output."),
app: str = typer.Argument(
...,
help="FastStream app instance, e.g., main:app.",
),
message: str = typer.Argument(
...,
help="Message to be published.",
),
rpc: bool = typer.Option(
False,
is_flag=True,
help="Enable RPC mode and system output.",
),
is_factory: bool = typer.Option(
False,
"--factory",
Expand All @@ -227,23 +237,20 @@ def publish(
These are parsed and passed to the broker's publish method.
"""
app, extra = parse_cli_args(app, *ctx.args)

extra["message"] = message
extra["rpc"] = rpc
if "timeout" in extra:
extra["timeout"] = float(extra["timeout"])

try:
if not app:
raise ValueError("App parameter is required.")
if not message:
raise ValueError("Message parameter is required.")

_, app_obj = import_from_string(app)
if callable(app_obj) and is_factory:
app_obj = app_obj()

if not app_obj.broker:
raise ValueError("Broker instance not found in the app.")

result = anyio.run(publish_message, app_obj.broker, extra)
result = anyio.run(publish_message, app_obj.broker, rpc, extra)

if rpc:
typer.echo(result)
Expand All @@ -253,10 +260,16 @@ def publish(
sys.exit(1)


async def publish_message(broker: "BrokerUsecase[Any, Any]", extra: "AnyDict") -> Any:
async def publish_message(
broker: "BrokerUsecase[Any, Any]", rpc: bool, extra: "AnyDict"
) -> Any:
try:
async with broker:
return await broker.publish(**extra)
if rpc:
msg = await broker.request(**extra)
return msg
else:
return await broker.publish(**extra)
except Exception as e:
typer.echo(f"Error when broker was publishing: {e}")
sys.exit(1)
3 changes: 3 additions & 0 deletions faststream/_internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class _EmptyPlaceholder:
def __repr__(self) -> str:
return "EMPTY"

def __bool__(self) -> bool:
return False

def __eq__(self, other: object) -> bool:
if not isinstance(other, _EmptyPlaceholder):
return NotImplemented
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class SetupAble(Protocol):
@abstractmethod
def setup(self) -> None: ...
def _setup(self) -> None: ...


class Endpoint(SetupAble, Protocol):
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/publisher/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def create() -> "PublisherProto[MsgType]":

@override
@abstractmethod
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
producer: Optional["ProducerProto"],
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
producer: Optional["ProducerProto"],
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/subscriber/call_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __repr__(self) -> str:
return f"<'{self.call_name}': filter='{filter_name}'>"

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
parser: "AsyncCallable",
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_log_context(

@override
@abstractmethod
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
logger: Optional["LoggerProto"],
Expand Down
4 changes: 2 additions & 2 deletions faststream/_internal/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
logger: Optional["LoggerProto"],
Expand Down Expand Up @@ -181,7 +181,7 @@ def setup( # type: ignore[override]
self._parser = async_parser
self._decoder = async_decoder

call.setup(
call._setup(
parser=async_parser,
decoder=async_decoder,
apply_types=apply_types,
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/testing/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _patch_broker(self, broker: Broker) -> Generator[None, None, None]:
yield

def _fake_start(self, broker: Broker, *args: Any, **kwargs: Any) -> None:
broker.setup()
broker._setup()

patch_broker_calls(broker)

Expand Down
4 changes: 2 additions & 2 deletions faststream/confluent/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
self.builder = None

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
client_id: Optional[str],
Expand All @@ -124,7 +124,7 @@ def setup( # type: ignore[override]
self.client_id = client_id
self.builder = builder

super().setup(
super()._setup(
logger=logger,
producer=producer,
graceful_timeout=graceful_timeout,
Expand Down
4 changes: 2 additions & 2 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__(
self.task = None

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
client_id: Optional[str],
Expand All @@ -129,7 +129,7 @@ def setup( # type: ignore[override]
self.client_id = client_id
self.builder = builder

super().setup(
super()._setup(
logger=logger,
producer=producer,
graceful_timeout=graceful_timeout,
Expand Down
20 changes: 20 additions & 0 deletions faststream/message/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ def __init__(
self.committed: Optional[AckStatus] = None
self.processed = False

def __repr__(self) -> str:
inner = ", ".join(
filter(
bool,
(
f"body={self.body}",
f"content_type={self.content_type}",
f"message_id={self.message_id}",
f"correlation_id={self.correlation_id}",
f"reply_to={self.reply_to}" if self.reply_to else "",
f"headers={self.headers}",
f"path={self.path}",
f"committed={self.committed}",
f"raw_message={self.raw_message}",
),
)
)

return f"{self.__class__.__name__}({inner})"

async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
# TODO: make it lazy after `decoded_body` removed
Expand Down
15 changes: 6 additions & 9 deletions faststream/message/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from uuid import uuid4

from faststream._internal._compat import dump_json, json_loads
from faststream._internal.constants import EMPTY, ContentTypes
from faststream._internal.constants import ContentTypes

if TYPE_CHECKING:
from faststream._internal.basic_types import DecodedMessage, SendableMessage
Expand All @@ -30,20 +30,17 @@ def decode_message(message: "StreamMessage[Any]") -> "DecodedMessage":
body: Any = getattr(message, "body", message)
m: DecodedMessage = body

if (content_type := getattr(message, "content_type", EMPTY)) is not EMPTY:
content_type = cast(Optional[str], content_type)
if content_type := getattr(message, "content_type", False):
content_type = ContentTypes(cast(str, content_type))

if not content_type:
with suppress(json.JSONDecodeError, UnicodeDecodeError):
m = json_loads(body)

elif ContentTypes.text.value in content_type:
if content_type is ContentTypes.text:
m = body.decode()

elif ContentTypes.json.value in content_type:
elif content_type is ContentTypes.json:
m = json_loads(body)

else:
# content-type not set
with suppress(json.JSONDecodeError, UnicodeDecodeError):
m = json_loads(body)

Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(
self.producer = None

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
connection: ConnectionType,
Expand All @@ -146,7 +146,7 @@ def setup( # type: ignore[override]
) -> None:
self._connection = connection

super().setup(
super()._setup(
logger=logger,
producer=producer,
graceful_timeout=graceful_timeout,
Expand Down
4 changes: 2 additions & 2 deletions faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def __init__(
self.virtual_host = ""

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
producer: Optional["AioPikaFastProducer"],
Expand All @@ -169,7 +169,7 @@ def setup( # type: ignore[override]
) -> None:
self.app_id = app_id
self.virtual_host = virtual_host
super().setup(producer=producer)
super()._setup(producer=producer)

@property
def routing(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions faststream/rabbit/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
self.declarer = None

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
app_id: Optional[str],
Expand All @@ -120,7 +120,7 @@ def setup( # type: ignore[override]
self.virtual_host = virtual_host
self.declarer = declarer

super().setup(
super()._setup(
logger=logger,
producer=producer,
graceful_timeout=graceful_timeout,
Expand Down
4 changes: 2 additions & 2 deletions faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
self.task: Optional[asyncio.Task[None]] = None

@override
def setup( # type: ignore[override]
def _setup( # type: ignore[override]
self,
*,
connection: Optional["Redis[bytes]"],
Expand All @@ -122,7 +122,7 @@ def setup( # type: ignore[override]
) -> None:
self._client = connection

super().setup(
super()._setup(
logger=logger,
producer=producer,
graceful_timeout=graceful_timeout,
Expand Down
2 changes: 1 addition & 1 deletion faststream/specification/asyncapi/v2_6_0/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_app_schema(app: Application) -> Schema:
if broker is None: # pragma: no cover
raise RuntimeError()

broker.setup()
broker._setup()

servers = get_broker_server(broker)
channels = get_broker_channels(broker)
Expand Down
2 changes: 1 addition & 1 deletion faststream/specification/asyncapi/v3_0_0/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_app_schema(app: Application) -> Schema:
broker = app.broker
if broker is None: # pragma: no cover
raise RuntimeError()
broker.setup()
broker._setup()

servers = get_broker_server(broker)
channels = get_broker_channels(broker)
Expand Down
Loading

0 comments on commit cc55a61

Please sign in to comment.