Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI DX improvements #1723

Merged
merged 2 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ search:
- [apply_types](api/faststream/apply_types.md)
- app
- [FastStream](api/faststream/app/FastStream.md)
- [catch_startup_validation_error](api/faststream/app/catch_startup_validation_error.md)
- asgi
- [AsgiFastStream](api/faststream/asgi/AsgiFastStream.md)
- [AsgiResponse](api/faststream/asgi/AsgiResponse.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/app/catch_startup_validation_error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.app.catch_startup_validation_error
39 changes: 24 additions & 15 deletions faststream/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
Dict,
List,
Expand Down Expand Up @@ -161,7 +162,9 @@ async def run(

set_exit(lambda *_: self.exit(), sync=False)

async with self.lifespan_context(**(run_extra_options or {})):
async with catch_startup_validation_error(), self.lifespan_context(
**(run_extra_options or {})
):
try:
async with anyio.create_task_group() as tg:
tg.start_soon(self._startup, log_level, run_extra_options)
Expand All @@ -186,20 +189,7 @@ async def start(
) -> None:
"""Executes startup hooks and start broker."""
for func in self._on_startup_calling:
call = func(**run_extra_options)

try:
from pydantic import ValidationError as PValidation

except ImportError:
await call

else:
try:
await call
except PValidation as e:
fields = [str(x["loc"][0]) for x in e.errors()]
raise ValidationError(fields=fields) from e
await func(**run_extra_options)

if self.broker is not None:
await self.broker.start()
Expand Down Expand Up @@ -237,3 +227,22 @@ async def _shutdown(self, log_level: int = logging.INFO) -> None:
def _log(self, level: int, message: str) -> None:
if self.logger is not None:
self.logger.log(level, message)


try:
from contextlib import asynccontextmanager

from pydantic import ValidationError as PValidation

@asynccontextmanager
async def catch_startup_validation_error() -> AsyncIterator[None]:
try:
yield
except PValidation as e:
fields = [str(x["loc"][0]) for x in e.errors()]
raise ValidationError(fields=fields) from e

except ImportError:
from faststream.utils.functions import fake_context

catch_startup_validation_error = fake_context
22 changes: 13 additions & 9 deletions faststream/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@
def serve(
app: str = typer.Argument(
...,
help="[python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation",
help="[python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation.",
),
host: str = typer.Option(
"localhost",
help="documentation hosting address",
help="Documentation hosting address.",
),
port: int = typer.Option(
8000,
help="documentation hosting port",
help="Documentation hosting port.",
),
reload: bool = typer.Option(
False,
"--reload",
is_flag=True,
help="Restart documentation at directory files changes",
help="Restart documentation at directory files changes.",
),
app_dir: str = typer.Option(
".",
Expand All @@ -45,7 +45,10 @@ def serve(
),
),
is_factory: bool = typer.Option(
False, "--factory", help="Treat APP as an application factory"
False,
"--factory",
is_flag=True,
help="Treat APP as an application factory.",
),
) -> None:
"""Serve project AsyncAPI schema."""
Expand Down Expand Up @@ -87,17 +90,17 @@ def serve(
def gen(
app: str = typer.Argument(
...,
help="[python_module:FastStream] - path to your application",
help="[python_module:FastStream] - path to your application.",
),
yaml: bool = typer.Option(
False,
"--yaml",
is_flag=True,
help="generate `asyncapi.yaml` schema",
help="Generate `asyncapi.yaml` schema.",
),
out: Optional[str] = typer.Option(
None,
help="output filename",
help="Output filename.",
),
app_dir: str = typer.Option(
".",
Expand All @@ -110,7 +113,8 @@ def gen(
is_factory: bool = typer.Option(
False,
"--factory",
help="Treat APP as an application factory",
is_flag=True,
help="Treat APP as an application factory.",
),
) -> None:
"""Generate project AsyncAPI schema."""
Expand Down
36 changes: 20 additions & 16 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(
"--version",
callback=version_callback,
is_eager=True,
help="Show current platform, python and FastStream version",
help="Show current platform, python and FastStream version.",
),
) -> None:
"""Generate, run and manage FastStream apps to greater development experience."""
Expand All @@ -59,32 +59,31 @@ def run(
ctx: typer.Context,
app: str = typer.Argument(
...,
help="[python_module:FastStream] - path to your application",
help="[python_module:FastStream] - path to your application.",
),
workers: int = typer.Option(
1,
show_default=False,
help="Run [workers] applications with process spawning",
help="Run [workers] applications with process spawning.",
),
log_level: LogLevels = typer.Option(
LogLevels.info,
LogLevels.notset,
case_sensitive=False,
show_default=False,
help="[INFO] default",
help="Set selected level for FastStream and brokers logger objects.",
),
reload: bool = typer.Option(
False,
"--reload",
is_flag=True,
help="Restart app at directory files changes",
help="Restart app at directory files changes.",
),
watch_extensions: List[str] = typer.Option(
(),
"--extension",
"--ext",
"--reload-extension",
"--reload-ext",
help="List of file extensions to watch by",
help="List of file extensions to watch by.",
),
app_dir: str = typer.Option(
".",
Expand All @@ -98,7 +97,7 @@ def run(
False,
"--factory",
is_flag=True,
help="Treat APP as an application factory",
help="Treat APP as an application factory.",
),
) -> None:
"""Run [MODULE:APP] FastStream application."""
Expand Down Expand Up @@ -158,7 +157,7 @@ def _run(
app: str,
extra_options: Dict[str, "SettingField"],
is_factory: bool,
log_level: int = logging.INFO,
log_level: int = logging.NOTSET,
app_level: int = logging.INFO,
) -> None:
"""Runs the specified application."""
Expand All @@ -171,7 +170,8 @@ def _run(
f'Imported object "{app_obj}" must be "FastStream" type.',
)

set_log_level(log_level, app_obj)
if log_level > 0:
set_log_level(log_level, app_obj)

if sys.platform not in ("win32", "cygwin", "cli"): # pragma: no cover
with suppress(ImportError):
Expand All @@ -188,7 +188,11 @@ def _run(

except ValidationError as e:
ex = MissingParameter(
param=TyperOption(param_decls=[f"--{x}" for x in e.fields])
message=(
"You registered extra options in your application "
"`lifespan/on_startup` hook, but does not set in CLI."
),
param=TyperOption(param_decls=[f"--{x}" for x in e.fields]),
)

try:
Expand All @@ -206,14 +210,14 @@ def _run(
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"),
message: str = typer.Argument(..., help="Message to be published"),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app."),
message: str = typer.Argument(..., help="Message to be published."),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output."),
is_factory: bool = typer.Option(
False,
"--factory",
is_flag=True,
help="Treat APP as an application factory",
help="Treat APP as an application factory.",
),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
Expand Down
14 changes: 10 additions & 4 deletions faststream/cli/utils/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ class LogLevels(str, Enum):
"""

critical = "critical"
fatal = "fatal"
error = "error"
warning = "warning"
warn = "warn"
info = "info"
debug = "debug"
notset = "notset"


LOG_LEVELS: DefaultDict[str, int] = defaultdict(
lambda: logging.INFO,
**{
"critical": logging.CRITICAL,
"fatal": logging.FATAL,
"error": logging.ERROR,
"warning": logging.WARNING,
"warn": logging.WARN,
"info": logging.INFO,
"debug": logging.DEBUG,
"notset": logging.NOTSET,
},
)

Expand All @@ -60,9 +66,9 @@ def get_log_level(level: Union[LogLevels, str, int]) -> int:

def set_log_level(level: int, app: "FastStream") -> None:
"""Sets the log level for an application."""
if app.logger and isinstance(app.logger, logging.Logger):
app.logger.setLevel(level)
if app.logger and getattr(app.logger, "setLevel", None):
app.logger.setLevel(level) # type: ignore[attr-defined]

broker_logger: Optional[LoggerProto] = getattr(app.broker, "logger", None)
if broker_logger is not None and isinstance(broker_logger, logging.Logger):
broker_logger.setLevel(level)
if broker_logger is not None and getattr(broker_logger, "setLevel", None):
broker_logger.setLevel(level) # type: ignore[attr-defined]
2 changes: 1 addition & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(
] = 40 * 1000,
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
Doc("Milliseconds to backoff when retrying on errors."),
] = 100,
metadata_max_age_ms: Annotated[
int,
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(
] = 40 * 1000,
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
Doc("Milliseconds to backoff when retrying on errors."),
] = 100,
metadata_max_age_ms: Annotated[
int,
Expand Down
4 changes: 2 additions & 2 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class KafkaInitKwargs(TypedDict, total=False):
]
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
Doc("Milliseconds to backoff when retrying on errors."),
]
metadata_max_age_ms: Annotated[
int,
Expand Down Expand Up @@ -258,7 +258,7 @@ def __init__(
] = 40 * 1000,
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
Doc("Milliseconds to backoff when retrying on errors."),
] = 100,
metadata_max_age_ms: Annotated[
int,
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __init__(
] = 40 * 1000,
retry_backoff_ms: Annotated[
int,
Doc(" Milliseconds to backoff when retrying on errors."),
Doc("Milliseconds to backoff when retrying on errors."),
] = 100,
metadata_max_age_ms: Annotated[
int,
Expand Down
Loading