diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 56aabbb80f..f919573a1e 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -216,6 +216,7 @@ search: - [apply_types](api/faststream/apply_types.md) - app - [FastStream](api/faststream/app/FastStream.md) + - [catch_startup_validation_error](api/faststream/app/catch_startup_validation_error.md) - asgi - [AsgiFastStream](api/faststream/asgi/AsgiFastStream.md) - [AsgiResponse](api/faststream/asgi/AsgiResponse.md) diff --git a/docs/docs/en/api/faststream/app/catch_startup_validation_error.md b/docs/docs/en/api/faststream/app/catch_startup_validation_error.md new file mode 100644 index 0000000000..a53e4686f9 --- /dev/null +++ b/docs/docs/en/api/faststream/app/catch_startup_validation_error.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.app.catch_startup_validation_error diff --git a/faststream/app.py b/faststream/app.py index 8c5a554313..ebf71bfb1f 100644 --- a/faststream/app.py +++ b/faststream/app.py @@ -3,6 +3,7 @@ from typing import ( TYPE_CHECKING, Any, + AsyncIterator, Callable, Dict, List, @@ -161,7 +162,9 @@ async def run( set_exit(lambda *_: self.exit(), sync=False) - async with self.lifespan_context(**(run_extra_options or {})): + async with catch_startup_validation_error(), self.lifespan_context( + **(run_extra_options or {}) + ): try: async with anyio.create_task_group() as tg: tg.start_soon(self._startup, log_level, run_extra_options) @@ -186,20 +189,7 @@ async def start( ) -> None: """Executes startup hooks and start broker.""" for func in self._on_startup_calling: - call = func(**run_extra_options) - - try: - from pydantic import ValidationError as PValidation - - except ImportError: - await call - - else: - try: - await call - except PValidation as e: - fields = [str(x["loc"][0]) for x in e.errors()] - raise ValidationError(fields=fields) from e + await func(**run_extra_options) if self.broker is not None: await self.broker.start() @@ -237,3 +227,22 @@ async def _shutdown(self, log_level: int = logging.INFO) -> None: def _log(self, level: int, message: str) -> None: if self.logger is not None: self.logger.log(level, message) + + +try: + from contextlib import asynccontextmanager + + from pydantic import ValidationError as PValidation + + @asynccontextmanager + async def catch_startup_validation_error() -> AsyncIterator[None]: + try: + yield + except PValidation as e: + fields = [str(x["loc"][0]) for x in e.errors()] + raise ValidationError(fields=fields) from e + +except ImportError: + from faststream.utils.functions import fake_context + + catch_startup_validation_error = fake_context diff --git a/faststream/cli/docs/app.py b/faststream/cli/docs/app.py index 9fa9d5f51b..648390cf46 100644 --- a/faststream/cli/docs/app.py +++ b/faststream/cli/docs/app.py @@ -20,21 +20,21 @@ def serve( app: str = typer.Argument( ..., - help="[python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation", + help="[python_module:FastStream] or [asyncapi.yaml/.json] - path to your application or documentation.", ), host: str = typer.Option( "localhost", - help="documentation hosting address", + help="Documentation hosting address.", ), port: int = typer.Option( 8000, - help="documentation hosting port", + help="Documentation hosting port.", ), reload: bool = typer.Option( False, "--reload", is_flag=True, - help="Restart documentation at directory files changes", + help="Restart documentation at directory files changes.", ), app_dir: str = typer.Option( ".", @@ -45,7 +45,10 @@ def serve( ), ), is_factory: bool = typer.Option( - False, "--factory", help="Treat APP as an application factory" + False, + "--factory", + is_flag=True, + help="Treat APP as an application factory.", ), ) -> None: """Serve project AsyncAPI schema.""" @@ -87,17 +90,17 @@ def serve( def gen( app: str = typer.Argument( ..., - help="[python_module:FastStream] - path to your application", + help="[python_module:FastStream] - path to your application.", ), yaml: bool = typer.Option( False, "--yaml", is_flag=True, - help="generate `asyncapi.yaml` schema", + help="Generate `asyncapi.yaml` schema.", ), out: Optional[str] = typer.Option( None, - help="output filename", + help="Output filename.", ), app_dir: str = typer.Option( ".", @@ -110,7 +113,8 @@ def gen( is_factory: bool = typer.Option( False, "--factory", - help="Treat APP as an application factory", + is_flag=True, + help="Treat APP as an application factory.", ), ) -> None: """Generate project AsyncAPI schema.""" diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 8f2380c0cc..900a36d810 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -46,7 +46,7 @@ def main( "--version", callback=version_callback, is_eager=True, - help="Show current platform, python and FastStream version", + help="Show current platform, python and FastStream version.", ), ) -> None: """Generate, run and manage FastStream apps to greater development experience.""" @@ -59,24 +59,23 @@ def run( ctx: typer.Context, app: str = typer.Argument( ..., - help="[python_module:FastStream] - path to your application", + help="[python_module:FastStream] - path to your application.", ), workers: int = typer.Option( 1, show_default=False, - help="Run [workers] applications with process spawning", + help="Run [workers] applications with process spawning.", ), log_level: LogLevels = typer.Option( - LogLevels.info, + LogLevels.notset, case_sensitive=False, - show_default=False, - help="[INFO] default", + help="Set selected level for FastStream and brokers logger objects.", ), reload: bool = typer.Option( False, "--reload", is_flag=True, - help="Restart app at directory files changes", + help="Restart app at directory files changes.", ), watch_extensions: List[str] = typer.Option( (), @@ -84,7 +83,7 @@ def run( "--ext", "--reload-extension", "--reload-ext", - help="List of file extensions to watch by", + help="List of file extensions to watch by.", ), app_dir: str = typer.Option( ".", @@ -98,7 +97,7 @@ def run( False, "--factory", is_flag=True, - help="Treat APP as an application factory", + help="Treat APP as an application factory.", ), ) -> None: """Run [MODULE:APP] FastStream application.""" @@ -158,7 +157,7 @@ def _run( app: str, extra_options: Dict[str, "SettingField"], is_factory: bool, - log_level: int = logging.INFO, + log_level: int = logging.NOTSET, app_level: int = logging.INFO, ) -> None: """Runs the specified application.""" @@ -171,7 +170,8 @@ def _run( f'Imported object "{app_obj}" must be "FastStream" type.', ) - set_log_level(log_level, app_obj) + if log_level > 0: + set_log_level(log_level, app_obj) if sys.platform not in ("win32", "cygwin", "cli"): # pragma: no cover with suppress(ImportError): @@ -188,7 +188,11 @@ def _run( except ValidationError as e: ex = MissingParameter( - param=TyperOption(param_decls=[f"--{x}" for x in e.fields]) + message=( + "You registered extra options in your application " + "`lifespan/on_startup` hook, but does not set in CLI." + ), + param=TyperOption(param_decls=[f"--{x}" for x in e.fields]), ) try: @@ -206,14 +210,14 @@ def _run( ) def publish( ctx: typer.Context, - app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"), - message: str = typer.Argument(..., help="Message to be published"), - rpc: bool = typer.Option(False, help="Enable RPC mode and system output"), + app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app."), + message: str = typer.Argument(..., help="Message to be published."), + rpc: bool = typer.Option(False, help="Enable RPC mode and system output."), is_factory: bool = typer.Option( False, "--factory", is_flag=True, - help="Treat APP as an application factory", + help="Treat APP as an application factory.", ), ) -> None: """Publish a message using the specified broker in a FastStream application. diff --git a/faststream/cli/utils/logs.py b/faststream/cli/utils/logs.py index 2f223455f6..b576db49ee 100644 --- a/faststream/cli/utils/logs.py +++ b/faststream/cli/utils/logs.py @@ -20,20 +20,26 @@ class LogLevels(str, Enum): """ critical = "critical" + fatal = "fatal" error = "error" warning = "warning" + warn = "warn" info = "info" debug = "debug" + notset = "notset" LOG_LEVELS: DefaultDict[str, int] = defaultdict( lambda: logging.INFO, **{ "critical": logging.CRITICAL, + "fatal": logging.FATAL, "error": logging.ERROR, "warning": logging.WARNING, + "warn": logging.WARN, "info": logging.INFO, "debug": logging.DEBUG, + "notset": logging.NOTSET, }, ) @@ -60,9 +66,9 @@ def get_log_level(level: Union[LogLevels, str, int]) -> int: def set_log_level(level: int, app: "FastStream") -> None: """Sets the log level for an application.""" - if app.logger and isinstance(app.logger, logging.Logger): - app.logger.setLevel(level) + if app.logger and getattr(app.logger, "setLevel", None): + app.logger.setLevel(level) # type: ignore[attr-defined] broker_logger: Optional[LoggerProto] = getattr(app.broker, "logger", None) - if broker_logger is not None and isinstance(broker_logger, logging.Logger): - broker_logger.setLevel(level) + if broker_logger is not None and getattr(broker_logger, "setLevel", None): + broker_logger.setLevel(level) # type: ignore[attr-defined] diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 7d94f7c123..e290a276f5 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -87,7 +87,7 @@ def __init__( ] = 40 * 1000, retry_backoff_ms: Annotated[ int, - Doc(" Milliseconds to backoff when retrying on errors."), + Doc("Milliseconds to backoff when retrying on errors."), ] = 100, metadata_max_age_ms: Annotated[ int, diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index 6c09903284..b4a2e6bba4 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -93,7 +93,7 @@ def __init__( ] = 40 * 1000, retry_backoff_ms: Annotated[ int, - Doc(" Milliseconds to backoff when retrying on errors."), + Doc("Milliseconds to backoff when retrying on errors."), ] = 100, metadata_max_age_ms: Annotated[ int, diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index 0edfc97e73..3fa4573ffa 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -64,7 +64,7 @@ class KafkaInitKwargs(TypedDict, total=False): ] retry_backoff_ms: Annotated[ int, - Doc(" Milliseconds to backoff when retrying on errors."), + Doc("Milliseconds to backoff when retrying on errors."), ] metadata_max_age_ms: Annotated[ int, @@ -258,7 +258,7 @@ def __init__( ] = 40 * 1000, retry_backoff_ms: Annotated[ int, - Doc(" Milliseconds to backoff when retrying on errors."), + Doc("Milliseconds to backoff when retrying on errors."), ] = 100, metadata_max_age_ms: Annotated[ int, diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index d1ee019b1d..d8fc7331ee 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -97,7 +97,7 @@ def __init__( ] = 40 * 1000, retry_backoff_ms: Annotated[ int, - Doc(" Milliseconds to backoff when retrying on errors."), + Doc("Milliseconds to backoff when retrying on errors."), ] = 100, metadata_max_age_ms: Annotated[ int,