diff --git a/.flake8 b/.flake8 index d01801b..1590469 100644 --- a/.flake8 +++ b/.flake8 @@ -80,6 +80,14 @@ ignore = WPS229, ; Found function with too much cognitive complexity WPS231, + ; Found too deep nesting + WPS220, + ; Found line with high Jones Complexity + WPS221, + ; function name should be lowercase + N802, + ; Do not perform function calls in argument defaults. + B008, ; all init files __init__.py: @@ -99,6 +107,10 @@ per-file-ignores = WPS432, ; Missing parameter(s) in Docstring DAR101, + ; Found too short name + WPS111, + ; Found complex default value + WPS404, exclude = ./.git, diff --git a/docs/examples/introduction/aio_pika_broker.py b/docs/examples/introduction/aio_pika_broker.py index 452340e..980361d 100644 --- a/docs/examples/introduction/aio_pika_broker.py +++ b/docs/examples/introduction/aio_pika_broker.py @@ -22,6 +22,7 @@ async def main() -> None: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") + await broker.shutdown() if __name__ == "__main__": diff --git a/docs/examples/introduction/full_example.py b/docs/examples/introduction/full_example.py index 47ed3f2..1a7e515 100644 --- a/docs/examples/introduction/full_example.py +++ b/docs/examples/introduction/full_example.py @@ -26,6 +26,7 @@ async def main() -> None: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") + await broker.shutdown() if __name__ == "__main__": diff --git a/docs/examples/introduction/inmemory_run.py b/docs/examples/introduction/inmemory_run.py index 2bff495..1e81456 100644 --- a/docs/examples/introduction/inmemory_run.py +++ b/docs/examples/introduction/inmemory_run.py @@ -12,6 +12,7 @@ async def add_one(value: int) -> int: async def main() -> None: + await broker.startup() # Send the task to the broker. task = await add_one.kiq(1) # Wait for the result. @@ -21,6 +22,7 @@ async def main() -> None: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") + await broker.shutdown() if __name__ == "__main__": diff --git a/docs/examples/state/async_generator_deps.py b/docs/examples/state/async_generator_deps.py new file mode 100644 index 0000000..ad7157a --- /dev/null +++ b/docs/examples/state/async_generator_deps.py @@ -0,0 +1,18 @@ +import asyncio +from typing import AsyncGenerator + +from taskiq import TaskiqDepends + + +async def dependency() -> AsyncGenerator[str, None]: + print("Startup") + await asyncio.sleep(0.1) + + yield "value" + + await asyncio.sleep(0.1) + print("Shutdown") + + +async def my_task(dep: str = TaskiqDepends(dependency)) -> None: + print(dep.upper()) diff --git a/docs/examples/state/class_dependency.py b/docs/examples/state/class_dependency.py new file mode 100644 index 0000000..ba9f1e8 --- /dev/null +++ b/docs/examples/state/class_dependency.py @@ -0,0 +1,17 @@ +from taskiq import TaskiqDepends + + +async def db_connection() -> str: + return "let's pretend as this is a connection" + + +class MyDAO: + def __init__(self, db_conn: str = TaskiqDepends(db_connection)) -> None: + self.db_conn = db_conn + + def get_users(self) -> str: + return self.db_conn.upper() + + +def my_task(dao: MyDAO = TaskiqDepends()) -> None: + print(dao.get_users()) diff --git a/docs/examples/state/dependencies_tree.py b/docs/examples/state/dependencies_tree.py new file mode 100644 index 0000000..1818c7e --- /dev/null +++ b/docs/examples/state/dependencies_tree.py @@ -0,0 +1,26 @@ +import random + +from taskiq import TaskiqDepends + + +def common_dep() -> int: + # For example it returns 8 + return random.randint(1, 10) + + +def dep1(cd: int = TaskiqDepends(common_dep)) -> int: + # This function will return 9 + return cd + 1 + + +def dep2(cd: int = TaskiqDepends(common_dep)) -> int: + # This function will return 10 + return cd + 2 + + +def my_task( + d1: int = TaskiqDepends(dep1), + d2: int = TaskiqDepends(dep2), +) -> int: + # This function will return 19 + return d1 + d2 diff --git a/docs/examples/state/events_example.py b/docs/examples/state/events_example.py new file mode 100644 index 0000000..9df9a7b --- /dev/null +++ b/docs/examples/state/events_example.py @@ -0,0 +1,65 @@ +import asyncio +from typing import Optional + +from redis.asyncio import ConnectionPool, Redis # type: ignore +from taskiq_aio_pika import AioPikaBroker +from taskiq_redis import RedisAsyncResultBackend + +from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState + +# To run this example, please install: +# * taskiq +# * taskiq-redis +# * taskiq-aio-pika + +broker = AioPikaBroker( + "amqp://localhost", + result_backend=RedisAsyncResultBackend( + "redis://localhost/0", + ), +) + + +@broker.on_event(TaskiqEvents.WORKER_STARTUP) +async def startup(state: TaskiqState) -> None: + # Here we store connection pool on startup for later use. + state.redis = ConnectionPool.from_url("redis://localhost/1") + + +@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) +async def shutdown(state: TaskiqState) -> None: + # Here we close our pool on shutdown event. + await state.redis.disconnect() + + +@broker.task +async def get_val(key: str, context: Context = TaskiqDepends()) -> Optional[str]: + # Now we can use our pool. + redis = Redis(connection_pool=context.state.redis, decode_responses=True) + return await redis.get(key) + + +@broker.task +async def set_val(key: str, value: str, context: Context = TaskiqDepends()) -> None: + # Now we can use our pool to set value. + await Redis(connection_pool=context.state.redis).set(key, value) + + +async def main() -> None: + await broker.startup() + + set_task = await set_val.kiq("key", "value") + set_result = await set_task.wait_result(with_logs=True) + if set_result.is_err: + print(set_result.log) + raise ValueError("Cannot set value in redis. See logs.") + + get_task = await get_val.kiq("key") + get_res = await get_task.wait_result() + print(f"Got redis value: {get_res.return_value}") + + await broker.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/examples/state/generator_deps.py b/docs/examples/state/generator_deps.py new file mode 100644 index 0000000..65f78b2 --- /dev/null +++ b/docs/examples/state/generator_deps.py @@ -0,0 +1,15 @@ +from typing import Generator + +from taskiq import TaskiqDepends + + +def dependency() -> Generator[str, None, None]: + print("Startup") + + yield "value" + + print("Shutdown") + + +async def my_task(dep: str = TaskiqDepends(dependency)) -> None: + print(dep.upper()) diff --git a/docs/examples/state/no_cache.py b/docs/examples/state/no_cache.py new file mode 100644 index 0000000..eb968ba --- /dev/null +++ b/docs/examples/state/no_cache.py @@ -0,0 +1,22 @@ +import random + +from taskiq import TaskiqDepends + + +def common_dep() -> int: + return random.randint(1, 10) + + +def dep1(cd: int = TaskiqDepends(common_dep)) -> int: + return cd + 1 + + +def dep2(cd: int = TaskiqDepends(common_dep, use_cache=False)) -> int: + return cd + 2 + + +def my_task( + d1: int = TaskiqDepends(dep1), + d2: int = TaskiqDepends(dep2), +) -> int: + return d1 + d2 diff --git a/docs/guide/getting-started.md b/docs/guide/getting-started.md index d5a4c5c..0c2abdf 100644 --- a/docs/guide/getting-started.md +++ b/docs/guide/getting-started.md @@ -54,10 +54,16 @@ from taskiq import InMemoryBroker broker = InMemoryBroker() ``` -And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. +And that's it. Now let's add some tasks and the main function. You can add tasks in separate modules. You can find more information about that further. Also, we call the `startup` method at the beginning of the `main` function. @[code python](../examples/introduction/inmemory_run.py) +::: tip Cool tip! + +Calling the `startup` method is not required, but we strongly recommend you do so. + +::: + If you run this code, you will get this in your terminal: ```bash:no-line-numbers diff --git a/docs/guide/scheduling-tasks.md b/docs/guide/scheduling-tasks.md index 571eae0..7075765 100644 --- a/docs/guide/scheduling-tasks.md +++ b/docs/guide/scheduling-tasks.md @@ -1,5 +1,5 @@ --- -order: 7 +order: 8 --- # Scheduling tasks @@ -45,6 +45,12 @@ it may execute one task N times, where N is the number of running scheduler inst This command will import the scheduler you defined and start sending tasks to your broker. +::: tip Cool tip! + +The scheduler doesn't execute tasks. It only sends them. + +::: + You can check list of available schedule sources in the [Available schedule sources](../available-components/schedule-sources.md) section. diff --git a/docs/guide/state-and-deps.md b/docs/guide/state-and-deps.md new file mode 100644 index 0000000..09fe046 --- /dev/null +++ b/docs/guide/state-and-deps.md @@ -0,0 +1,172 @@ +--- +order: 7 +--- + +# State and Dependencies + + +## State + +The `TaskiqState` is a global variable where you can keep the variables you want to use later. +For example, you want to open a database connection pool at a broker's startup. + +This can be acieved by adding event handlers. + +You can use one of these events: +* `WORKER_STARTUP` +* `CLIENT_STARTUP` +* `WORKER_SHUTDOWN` +* `CLIENT_SHUTDOWN` + +Worker events are called when you start listening to the broker messages using taskiq. +Client events are called when you call the `startup` method of your broker from your code. + +This is an example of code using event handlers: + +@[code python](../examples/state/events_example.py) + +::: tip Cool tip! + +If you want to add handlers programmatically, you can use the `broker.add_event_handler` function. + +::: + +As you can see in this example, this worker will initialize the Redis pool at the startup. +You can access the state from the context. + + +## Dependencies + +Using context directly is nice, but this way won't get completion. + +That's why we suggest you try TaskiqDependencies. The implementation is very similar to FastApi's dependencies. You can use classes, functions, and generators as dependencies. + +::: danger Cool alarm! + +FastAPI's `Depends` is not compatible with `TaskiqDepends`. + +::: + +### How dependencies are useful + +You can use dependencies for better autocompletion and reduce the amount of code you write. +Since the state is generic, we cannot guess the types of the state fields. +Dependencies can be annotated with type hints and therfore provide better auto-completion. + +Let's assume that you've stored a Redis connection pool in the state as in the example above. +```python +@broker.on_event(TaskiqEvents.WORKER_STARTUP) +async def startup(state: TaskiqState) -> None: + # Here we store connection pool on startup for later use. + state.redis = ConnectionPool.from_url("redis://localhost/1") + +``` + +You can access this variable by using the current execution context directly, like this: + +```python +@broker.task +async def my_task(context: Context = TaskiqDepends()) -> None: + async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis: + await redis.set('key', 'value') +``` + +If you hit the `TAB` button after the `context.state.` expression, your IDE won't give you any auto-completion. +But we can create a dependency function to add auto-completion. + +```python + +def redis_dep(context: Context = TaskiqDepends()) -> Redis: + return Redis(connection_pool=context.state.redis, decode_responses=True) + +@broker.task +async def my_task(redis: Redis = TaskiqDepends(redis_dep)) -> None: + await redis.set('key', 'value') + +``` + +Now, this dependency injection will be autocompleted. But, of course, state fields cannot be autocompleted, +even in dependencies. But this way, you won't make any typos while writing tasks. + + +### How do dependencies work + +We build a graph of dependencies on startup. If the parameter of the function has +the default value of `TaskiqDepends` this parameter will be treated as a dependency. + +Dependencies can also depend on something. Also dependencies are optimized to **not** evaluate things many times. + +For example: + +@[code python](../examples/state/dependencies_tree.py) + +In this code, the dependency `common_dep` is going to be evaluated only once and the `dep1` and the `dep2` are going to recevie the same value. You can control this behaviour by using the `use_cache=False` parameter to you dependency. This parameter will force the +dependency to reevaluate all it's subdependencies. + + +In this example we cannot predict the result. Since the `dep2` doesn't use cache for the `common_dep` function. +@[code python](../examples/state/no_cache.py) + +The graph for cached dependencies looks like this: + +```mermaid +graph TD + A[common_dep] + B[dep1] + C[dep2] + D[my_task] + A --> B + A --> C + B --> D + C --> D +``` + +The dependencies graph for `my_task` where `dep2` doesn't use cached value for `common_dep` looks like this: + +```mermaid +graph TD + A[common_dep] + B[dep1] + D[my_task] + C[dep2] + subgraph without cache + A1[common_dep] + end + A --> B + A1 --> C + B --> D + C --> D +``` + +### Class as a dependency + +You can use classes as dependencies, and they can also use other dependencies too. + +Let's see an example: + +@[code python](../examples/state/class_dependency.py) + +As you can see, the dependency for `my_task` function is declared with `TaskiqDependency()`. +It's because you can omit the class if it's declared in typehint for the parameter. This feature doesn't +work with dependency functions, it's only for classes. + +You can pass dependencies for classes in the constructor. + +### Generator dependencies + +Generator dependencies are used to perform startup before task execution and teardown after the task execution. + +@[code python](../examples/state/generator_deps.py) + +In this example, we can do something at startup before the execution and at shutdown after the task is completed. + +If you want to do something asynchronously, convert this function to an asynchronous generator. Like this: + +@[code python](../examples/state/async_generator_deps.py) + + +### Default dependencies + +By default taskiq has only two deendencies: +* Context from `taskiq.context.Context` +* TaskiqState from `taskiq.state.TaskiqState` diff --git a/poetry.lock b/poetry.lock index 7f75a3b..5da7cc4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -12,8 +12,8 @@ sniffio = ">=1.1" typing-extensions = {version = "*", markers = "python_version < \"3.8\""} [package.extras] -doc = ["packaging", "sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"] -test = ["coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "contextlib2", "uvloop (<0.15)", "mock (>=4)", "uvloop (>=0.15)"] +doc = ["packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] +test = ["contextlib2", "coverage[toml] (>=4.5)", "hypothesis (>=4.0)", "mock (>=4)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (<0.15)", "uvloop (>=0.15)"] trio = ["trio (>=0.16)"] [[package]] @@ -33,14 +33,14 @@ optional = false python-versions = ">=3.5" [package.extras] -dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "zope.interface", "furo", "sphinx", "sphinx-notfound-page", "pre-commit", "cloudpickle"] -docs = ["furo", "sphinx", "zope.interface", "sphinx-notfound-page"] -tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "zope.interface", "cloudpickle"] -tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "mypy (>=0.900,!=0.940)", "pytest-mypy-plugins", "cloudpickle"] +dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy (>=0.900,!=0.940)", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "sphinx", "sphinx-notfound-page", "zope.interface"] +docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"] +tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "zope.interface"] +tests_no_zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"] [[package]] name = "autoflake" -version = "1.5.3" +version = "1.6.1" description = "Removes unused imports and unused variables" category = "dev" optional = false @@ -48,7 +48,7 @@ python-versions = ">=3.7" [package.dependencies] pyflakes = ">=1.1.0" -toml = ">=0.10.2" +tomli = {version = ">=2.0.1", markers = "python_version < \"3.11\""} [[package]] name = "bandit" @@ -65,9 +65,9 @@ PyYAML = ">=5.3.1" stevedore = ">=1.20.0" [package.extras] -test = ["coverage (>=4.5.4)", "fixtures (>=3.0.0)", "flake8 (>=4.0.0)", "stestr (>=2.5.0)", "testscenarios (>=0.5.0)", "testtools (>=2.3.0)", "toml", "beautifulsoup4 (>=4.8.0)", "pylint (==1.9.4)"] +test = ["beautifulsoup4 (>=4.8.0)", "coverage (>=4.5.4)", "fixtures (>=3.0.0)", "flake8 (>=4.0.0)", "pylint (==1.9.4)", "stestr (>=2.5.0)", "testscenarios (>=0.5.0)", "testtools (>=2.3.0)", "toml"] toml = ["toml"] -yaml = ["pyyaml"] +yaml = ["PyYAML"] [[package]] name = "black" @@ -241,7 +241,7 @@ flake8 = ">=3.5,<5" [[package]] name = "flake8-bugbear" -version = "22.8.23" +version = "22.9.23" description = "A plugin for flake8 finding likely bugs and design problems in your program. Contains warnings that don't belong in pyflakes and pycodestyle." category = "dev" optional = false @@ -303,16 +303,17 @@ pydocstyle = ">=2.1" [[package]] name = "flake8-eradicate" -version = "1.3.0" +version = "1.4.0" description = "Flake8 plugin to find commented out code" category = "dev" optional = false -python-versions = ">=3.6,<4.0" +python-versions = ">=3.7,<4.0" [package.dependencies] attrs = "*" eradicate = ">=2.0,<3.0" flake8 = ">=3.5,<6" +importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} [[package]] name = "flake8-isort" @@ -395,7 +396,7 @@ optional = false python-versions = "*" [[package]] -name = "gitpython" +name = "GitPython" version = "3.1.27" description = "GitPython is a python library used to interact with Git repositories" category = "dev" @@ -406,6 +407,14 @@ python-versions = ">=3.7" gitdb = ">=4.0.1,<5" typing-extensions = {version = ">=3.7.4.3", markers = "python_version < \"3.8\""} +[[package]] +name = "graphlib-backport" +version = "1.0.3" +description = "Backport of the Python 3.9 graphlib module for Python 3.6+" +category = "main" +optional = false +python-versions = ">=3.6,<4.0" + [[package]] name = "identify" version = "2.5.5" @@ -419,7 +428,7 @@ license = ["ukkonen"] [[package]] name = "idna" -version = "3.3" +version = "3.4" description = "Internationalized Domain Names in Applications (IDNA)" category = "dev" optional = false @@ -438,8 +447,8 @@ typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""} zipp = ">=0.5" [package.extras] -docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] -testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "packaging", "pep517", "pyfakefs", "flufl.flake8", "pytest-black (>=0.3.7)", "pytest-mypy", "importlib-resources (>=1.3)"] +docs = ["jaraco.packaging (>=8.2)", "rst.linker (>=1.9)", "sphinx"] +testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pep517", "pyfakefs", "pytest (>=4.6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-flake8", "pytest-mypy"] [[package]] name = "iniconfig" @@ -458,10 +467,10 @@ optional = false python-versions = ">=3.6.1,<4.0" [package.extras] -pipfile_deprecated_finder = ["pipreqs", "requirementslib"] -requirements_deprecated_finder = ["pipreqs", "pip-api"] colors = ["colorama (>=0.4.3,<0.5.0)"] +pipfile_deprecated_finder = ["pipreqs", "requirementslib"] plugins = ["setuptools"] +requirements_deprecated_finder = ["pip-api", "pipreqs"] [[package]] name = "mccabe" @@ -480,7 +489,7 @@ optional = false python-versions = ">=3.6" [package.extras] -build = ["twine", "wheel", "blurb"] +build = ["blurb", "twine", "wheel"] docs = ["sphinx"] test = ["pytest (<5.4)", "pytest-cov"] @@ -519,6 +528,9 @@ category = "dev" optional = false python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*" +[package.dependencies] +setuptools = "*" + [[package]] name = "packaging" version = "21.3" @@ -567,8 +579,8 @@ optional = false python-versions = ">=3.7" [package.extras] -docs = ["furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx-autodoc-typehints (>=1.12)", "sphinx (>=4)"] -test = ["appdirs (==1.4.4)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)", "pytest (>=6)"] +docs = ["furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx (>=4)", "sphinx-autodoc-typehints (>=1.12)"] +test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)"] [[package]] name = "pluggy" @@ -611,7 +623,7 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [package.extras] -test = ["ipaddress", "mock", "enum34", "pywin32", "wmi"] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] [[package]] name = "py" @@ -683,7 +695,7 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] -name = "pygments" +name = "Pygments" version = "2.13.0" description = "Pygments is a syntax highlighting package written in Python." category = "dev" @@ -702,7 +714,7 @@ optional = false python-versions = ">=3.6.8" [package.extras] -diagrams = ["railroad-diagrams", "jinja2"] +diagrams = ["jinja2", "railroad-diagrams"] [[package]] name = "pytest" @@ -738,7 +750,7 @@ coverage = {version = ">=5.2.1", extras = ["toml"]} pytest = ">=4.6" [package.extras] -testing = ["fields", "hunter", "process-tests", "six", "pytest-xdist", "virtualenv"] +testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtualenv"] [[package]] name = "pytest-forked" @@ -772,7 +784,7 @@ setproctitle = ["setproctitle"] testing = ["filelock"] [[package]] -name = "pyyaml" +name = "PyYAML" version = "6.0" description = "YAML parser and emitter for Python" category = "dev" @@ -802,6 +814,19 @@ python-versions = "*" [package.dependencies] docutils = ">=0.11,<1.0" +[[package]] +name = "setuptools" +version = "65.4.0" +description = "Easily download, build, install, upgrade, and uninstall Python packages" +category = "dev" +optional = false +python-versions = ">=3.7" + +[package.extras] +docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mock", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] + [[package]] name = "smmap" version = "5.0.0" @@ -888,16 +913,16 @@ python-versions = ">=3.7" [[package]] name = "uvloop" -version = "0.16.0" +version = "0.17.0" description = "Fast implementation of asyncio event loop on top of libuv" category = "main" optional = true python-versions = ">=3.7" [package.extras] -dev = ["Cython (>=0.29.24,<0.30.0)", "pytest (>=3.6.0)", "Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "aiohttp", "flake8 (>=3.9.2,<3.10.0)", "psutil", "pycodestyle (>=2.7.0,<2.8.0)", "pyOpenSSL (>=19.0.0,<19.1.0)", "mypy (>=0.800)"] -docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"] -test = ["aiohttp", "flake8 (>=3.9.2,<3.10.0)", "psutil", "pycodestyle (>=2.7.0,<2.8.0)", "pyOpenSSL (>=19.0.0,<19.1.0)", "mypy (>=0.800)"] +dev = ["Cython (>=0.29.32,<0.30.0)", "Sphinx (>=4.1.2,<4.2.0)", "aiohttp", "flake8 (>=3.9.2,<3.10.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=22.0.0,<22.1.0)", "pycodestyle (>=2.7.0,<2.8.0)", "pytest (>=3.6.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] +docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] +test = ["Cython (>=0.29.32,<0.30.0)", "aiohttp", "flake8 (>=3.9.2,<3.10.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=22.0.0,<22.1.0)", "pycodestyle (>=2.7.0,<2.8.0)"] [[package]] name = "virtualenv" @@ -979,8 +1004,8 @@ optional = false python-versions = ">=3.7" [package.extras] -docs = ["sphinx", "jaraco.packaging (>=9)", "rst.linker (>=1.9)", "jaraco.tidelift (>=1.4)"] -testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.3)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy (>=0.9.1)"] +docs = ["jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx"] +testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] [extras] uv = ["uvloop"] @@ -989,7 +1014,7 @@ zmq = ["pyzmq"] [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "fc7269926cf306cf1b11898b8cbd3f03bee332141831dbe0d79b4006f7ba8077" +content-hash = "ec069d8223f5ff127bdcbea9b254aa28c071901d29358e1d92a161932948460d" [metadata.files] anyio = [ @@ -1005,8 +1030,8 @@ attrs = [ {file = "attrs-22.1.0.tar.gz", hash = "sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6"}, ] autoflake = [ - {file = "autoflake-1.5.3-py2.py3-none-any.whl", hash = "sha256:90eb8d3f625bd72068eb670338ea7efcddbc5c6e822d3601e3dc9404c06ea8da"}, - {file = "autoflake-1.5.3.tar.gz", hash = "sha256:44f7d7eb2c1c49505b513c0e93a5dfd3f7b4218283f50c5ca0af4df6b975d470"}, + {file = "autoflake-1.6.1-py2.py3-none-any.whl", hash = "sha256:dfef4c851fb07e6111f9115d3e7c8c52d8564cbf71c12ade2d8b8a2a7b8bd176"}, + {file = "autoflake-1.6.1.tar.gz", hash = "sha256:72bce741144ef6db26005d47dba242c1fd6a91ea53f7f4c5a90ad4b051e394c2"}, ] bandit = [ {file = "bandit-1.7.4-py3-none-any.whl", hash = "sha256:412d3f259dab4077d0e7f0c11f50f650cc7d10db905d98f6520a95a18049658a"}, @@ -1204,8 +1229,8 @@ flake8-broken-line = [ {file = "flake8_broken_line-0.4.0-py3-none-any.whl", hash = "sha256:e9c522856862239a2c7ef2c1de0276fa598572aa864bd4e9c7efc2a827538515"}, ] flake8-bugbear = [ - {file = "flake8-bugbear-22.8.23.tar.gz", hash = "sha256:de0717d11124a082118dd08387b34fd86b2721642ec2d8e92be66cfa5ea7c445"}, - {file = "flake8_bugbear-22.8.23-py3-none-any.whl", hash = "sha256:1b0ebe0873d1cd55bf9f1588bfcb930db339018ef44a3981a26532daa9fd14a8"}, + {file = "flake8-bugbear-22.9.23.tar.gz", hash = "sha256:17b9623325e6e0dcdcc80ed9e4aa811287fcc81d7e03313b8736ea5733759937"}, + {file = "flake8_bugbear-22.9.23-py3-none-any.whl", hash = "sha256:cd2779b2b7ada212d7a322814a1e5651f1868ab0d3f24cc9da66169ab8fda474"}, ] flake8-commas = [ {file = "flake8-commas-2.1.0.tar.gz", hash = "sha256:940441ab8ee544df564ae3b3f49f20462d75d5c7cac2463e0b27436e2050f263"}, @@ -1224,8 +1249,8 @@ flake8-docstrings = [ {file = "flake8_docstrings-1.6.0-py2.py3-none-any.whl", hash = "sha256:99cac583d6c7e32dd28bbfbef120a7c0d1b6dde4adb5a9fd441c4227a6534bde"}, ] flake8-eradicate = [ - {file = "flake8-eradicate-1.3.0.tar.gz", hash = "sha256:e4c98f00d17dc8653e3388cac2624cd81e9735de2fd4a8dcf99029633ebd7a63"}, - {file = "flake8_eradicate-1.3.0-py3-none-any.whl", hash = "sha256:85a71e0c5f4e07f7c6c5fec520483561fd6bd295417d622855bdeade99242e3d"}, + {file = "flake8-eradicate-1.4.0.tar.gz", hash = "sha256:3088cfd6717d1c9c6c3ac45ef2e5f5b6c7267f7504d5a74b781500e95cb9c7e1"}, + {file = "flake8_eradicate-1.4.0-py3-none-any.whl", hash = "sha256:e3bbd0871be358e908053c1ab728903c114f062ba596b4d40c852fd18f473d56"}, ] flake8-isort = [ {file = "flake8-isort-4.2.0.tar.gz", hash = "sha256:26571500cd54976bbc0cf1006ffbcd1a68dd102f816b7a1051b219616ba9fee0"}, @@ -1253,17 +1278,21 @@ gitdb = [ gitignore-parser = [ {file = "gitignore_parser-0.1.0.tar.gz", hash = "sha256:7efe2677dd433b784c3f28c7246478e6ff442e22ff56d54eb6b9e9cec96e873c"}, ] -gitpython = [ +GitPython = [ {file = "GitPython-3.1.27-py3-none-any.whl", hash = "sha256:5b68b000463593e05ff2b261acff0ff0972df8ab1b70d3cdbd41b546c8b8fc3d"}, {file = "GitPython-3.1.27.tar.gz", hash = "sha256:1c885ce809e8ba2d88a29befeb385fcea06338d3640712b59ca623c220bb5704"}, ] +graphlib-backport = [ + {file = "graphlib_backport-1.0.3-py3-none-any.whl", hash = "sha256:24246967b9e7e6a91550bc770e6169585d35aa32790258579a8a3899a8c18fde"}, + {file = "graphlib_backport-1.0.3.tar.gz", hash = "sha256:7bb8fc7757b8ae4e6d8000a26cd49e9232aaa9a3aa57edb478474b8424bfaae2"}, +] identify = [ {file = "identify-2.5.5-py2.py3-none-any.whl", hash = "sha256:ef78c0d96098a3b5fe7720be4a97e73f439af7cf088ebf47b620aeaa10fadf97"}, {file = "identify-2.5.5.tar.gz", hash = "sha256:322a5699daecf7c6fd60e68852f36f2ecbb6a36ff6e6e973e0d2bb6fca203ee6"}, ] idna = [ - {file = "idna-3.3-py3-none-any.whl", hash = "sha256:84d9dd047ffa80596e0f246e2eab0b391788b0503584e8945f2368256d2735ff"}, - {file = "idna-3.3.tar.gz", hash = "sha256:9d643ff0a55b762d5cdb124b8eaa99c66322e2157b69160bc32796e824360e6d"}, + {file = "idna-3.4-py3-none-any.whl", hash = "sha256:90b77e79eaa3eba6de819a0c442c0b4ceefc341a7a2ab77d7562bf49f425c5c2"}, + {file = "idna-3.4.tar.gz", hash = "sha256:814f528e8dead7d329833b91c5faa87d60bf71824cd12a7530b5526063d02cb4"}, ] importlib-metadata = [ {file = "importlib_metadata-4.2.0-py3-none-any.whl", hash = "sha256:057e92c15bc8d9e8109738a48db0ccb31b4d9d5cfbee5a8670879a30be66304b"}, @@ -1441,7 +1470,7 @@ pyflakes = [ {file = "pyflakes-2.4.0-py2.py3-none-any.whl", hash = "sha256:3bb3a3f256f4b7968c9c788781e4ff07dce46bdf12339dcda61053375426ee2e"}, {file = "pyflakes-2.4.0.tar.gz", hash = "sha256:05a85c2872edf37a4ed30b0cce2f6093e1d0581f8c19d7393122da7e25b2b24c"}, ] -pygments = [ +Pygments = [ {file = "Pygments-2.13.0-py3-none-any.whl", hash = "sha256:f643f331ab57ba3c9d89212ee4a2dabc6e94f117cf4eefde99a0574720d14c42"}, {file = "Pygments-2.13.0.tar.gz", hash = "sha256:56a8508ae95f98e2b9bdf93a6be5ae3f7d8af858b43e02c5a2ff083726be40c1"}, ] @@ -1465,7 +1494,7 @@ pytest-xdist = [ {file = "pytest-xdist-2.5.0.tar.gz", hash = "sha256:4580deca3ff04ddb2ac53eba39d76cb5dd5edeac050cb6fbc768b0dd712b4edf"}, {file = "pytest_xdist-2.5.0-py3-none-any.whl", hash = "sha256:6fe5c74fec98906deb8f2d2b616b5c782022744978e7bd4695d39c8f42d0ce65"}, ] -pyyaml = [ +PyYAML = [ {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"}, {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"}, {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc"}, @@ -1473,6 +1502,13 @@ pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"}, + {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"}, + {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, @@ -1539,6 +1575,7 @@ pyzmq = [ {file = "pyzmq-23.2.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:48400b96788cdaca647021bf19a9cd668384f46e4d9c55cf045bdd17f65299c8"}, {file = "pyzmq-23.2.1-cp37-cp37m-win32.whl", hash = "sha256:8a68f57b7a3f7b6b52ada79876be1efb97c8c0952423436e84d70cc139f16f0d"}, {file = "pyzmq-23.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:9e5bf6e7239fc9687239de7a283aa8b801ab85371116045b33ae20132a1325d6"}, + {file = "pyzmq-23.2.1-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:0ff6294e001129a9f22dcbfba186165c7e6f573c46de2704d76f873c94c65416"}, {file = "pyzmq-23.2.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ffc6b1623d0f9affb351db4ca61f432dca3628a5ee015f9bf2bfbe9c6836881c"}, {file = "pyzmq-23.2.1-cp38-cp38-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:4d6f110c56f7d5b4d64dde3a382ae61b6d48174e30742859d8e971b18b6c9e5c"}, {file = "pyzmq-23.2.1-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:9269fbfe3a4eb2009199120861c4571ef1655fdf6951c3e7f233567c94e8c602"}, @@ -1578,6 +1615,10 @@ pyzmq = [ restructuredtext-lint = [ {file = "restructuredtext_lint-1.4.0.tar.gz", hash = "sha256:1b235c0c922341ab6c530390892eb9e92f90b9b75046063e047cacfb0f050c45"}, ] +setuptools = [ + {file = "setuptools-65.4.0-py3-none-any.whl", hash = "sha256:c2d2709550f15aab6c9110196ea312f468f41cd546bceb24127a1be6fdcaeeb1"}, + {file = "setuptools-65.4.0.tar.gz", hash = "sha256:a8f6e213b4b0661f590ccf40de95d28a177cd747d098624ad3f69c40287297e9"}, +] smmap = [ {file = "smmap-5.0.0-py3-none-any.whl", hash = "sha256:2aba19d6a040e78d8b09de5c57e96207b09ed71d8e55ce0959eeee6c8e190d94"}, {file = "smmap-5.0.0.tar.gz", hash = "sha256:c840e62059cd3be204b0c9c9f74be2c09d5648eddd4580d9314c3ecde0b30936"}, @@ -1641,22 +1682,36 @@ typing-extensions = [ {file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"}, ] uvloop = [ - {file = "uvloop-0.16.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6224f1401025b748ffecb7a6e2652b17768f30b1a6a3f7b44660e5b5b690b12d"}, - {file = "uvloop-0.16.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:30ba9dcbd0965f5c812b7c2112a1ddf60cf904c1c160f398e7eed3a6b82dcd9c"}, - {file = "uvloop-0.16.0-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:bd53f7f5db562f37cd64a3af5012df8cac2c464c97e732ed556800129505bd64"}, - {file = "uvloop-0.16.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:772206116b9b57cd625c8a88f2413df2fcfd0b496eb188b82a43bed7af2c2ec9"}, - {file = "uvloop-0.16.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b572256409f194521a9895aef274cea88731d14732343da3ecdb175228881638"}, - {file = "uvloop-0.16.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:04ff57aa137230d8cc968f03481176041ae789308b4d5079118331ab01112450"}, - {file = "uvloop-0.16.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3a19828c4f15687675ea912cc28bbcb48e9bb907c801873bd1519b96b04fb805"}, - {file = "uvloop-0.16.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e814ac2c6f9daf4c36eb8e85266859f42174a4ff0d71b99405ed559257750382"}, - {file = "uvloop-0.16.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bd8f42ea1ea8f4e84d265769089964ddda95eb2bb38b5cbe26712b0616c3edee"}, - {file = "uvloop-0.16.0-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:647e481940379eebd314c00440314c81ea547aa636056f554d491e40503c8464"}, - {file = "uvloop-0.16.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e0d26fa5875d43ddbb0d9d79a447d2ace4180d9e3239788208527c4784f7cab"}, - {file = "uvloop-0.16.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:6ccd57ae8db17d677e9e06192e9c9ec4bd2066b77790f9aa7dede2cc4008ee8f"}, - {file = "uvloop-0.16.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:089b4834fd299d82d83a25e3335372f12117a7d38525217c2258e9b9f4578897"}, - {file = "uvloop-0.16.0-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:98d117332cc9e5ea8dfdc2b28b0a23f60370d02e1395f88f40d1effd2cb86c4f"}, - {file = "uvloop-0.16.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1e5f2e2ff51aefe6c19ee98af12b4ae61f5be456cd24396953244a30880ad861"}, - {file = "uvloop-0.16.0.tar.gz", hash = "sha256:f74bc20c7b67d1c27c72601c78cf95be99d5c2cdd4514502b4f3eb0933ff1228"}, + {file = "uvloop-0.17.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ce9f61938d7155f79d3cb2ffa663147d4a76d16e08f65e2c66b77bd41b356718"}, + {file = "uvloop-0.17.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:68532f4349fd3900b839f588972b3392ee56042e440dd5873dfbbcd2cc67617c"}, + {file = "uvloop-0.17.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0949caf774b9fcefc7c5756bacbbbd3fc4c05a6b7eebc7c7ad6f825b23998d6d"}, + {file = "uvloop-0.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff3d00b70ce95adce264462c930fbaecb29718ba6563db354608f37e49e09024"}, + {file = "uvloop-0.17.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:a5abddb3558d3f0a78949c750644a67be31e47936042d4f6c888dd6f3c95f4aa"}, + {file = "uvloop-0.17.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8efcadc5a0003d3a6e887ccc1fb44dec25594f117a94e3127954c05cf144d811"}, + {file = "uvloop-0.17.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3378eb62c63bf336ae2070599e49089005771cc651c8769aaad72d1bd9385a7c"}, + {file = "uvloop-0.17.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6aafa5a78b9e62493539456f8b646f85abc7093dd997f4976bb105537cf2635e"}, + {file = "uvloop-0.17.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c686a47d57ca910a2572fddfe9912819880b8765e2f01dc0dd12a9bf8573e539"}, + {file = "uvloop-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:864e1197139d651a76c81757db5eb199db8866e13acb0dfe96e6fc5d1cf45fc4"}, + {file = "uvloop-0.17.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:2a6149e1defac0faf505406259561bc14b034cdf1d4711a3ddcdfbaa8d825a05"}, + {file = "uvloop-0.17.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6708f30db9117f115eadc4f125c2a10c1a50d711461699a0cbfaa45b9a78e376"}, + {file = "uvloop-0.17.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:23609ca361a7fc587031429fa25ad2ed7242941adec948f9d10c045bfecab06b"}, + {file = "uvloop-0.17.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2deae0b0fb00a6af41fe60a675cec079615b01d68beb4cc7b722424406b126a8"}, + {file = "uvloop-0.17.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:45cea33b208971e87a31c17622e4b440cac231766ec11e5d22c76fab3bf9df62"}, + {file = "uvloop-0.17.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:9b09e0f0ac29eee0451d71798878eae5a4e6a91aa275e114037b27f7db72702d"}, + {file = "uvloop-0.17.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:dbbaf9da2ee98ee2531e0c780455f2841e4675ff580ecf93fe5c48fe733b5667"}, + {file = "uvloop-0.17.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:a4aee22ece20958888eedbad20e4dbb03c37533e010fb824161b4f05e641f738"}, + {file = "uvloop-0.17.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:307958f9fc5c8bb01fad752d1345168c0abc5d62c1b72a4a8c6c06f042b45b20"}, + {file = "uvloop-0.17.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3ebeeec6a6641d0adb2ea71dcfb76017602ee2bfd8213e3fcc18d8f699c5104f"}, + {file = "uvloop-0.17.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1436c8673c1563422213ac6907789ecb2b070f5939b9cbff9ef7113f2b531595"}, + {file = "uvloop-0.17.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8887d675a64cfc59f4ecd34382e5b4f0ef4ae1da37ed665adba0c2badf0d6578"}, + {file = "uvloop-0.17.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3db8de10ed684995a7f34a001f15b374c230f7655ae840964d51496e2f8a8474"}, + {file = "uvloop-0.17.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7d37dccc7ae63e61f7b96ee2e19c40f153ba6ce730d8ba4d3b4e9738c1dccc1b"}, + {file = "uvloop-0.17.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:cbbe908fda687e39afd6ea2a2f14c2c3e43f2ca88e3a11964b297822358d0e6c"}, + {file = "uvloop-0.17.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d97672dc709fa4447ab83276f344a165075fd9f366a97b712bdd3fee05efae8"}, + {file = "uvloop-0.17.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1e507c9ee39c61bfddd79714e4f85900656db1aec4d40c6de55648e85c2799c"}, + {file = "uvloop-0.17.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:c092a2c1e736086d59ac8e41f9c98f26bbf9b9222a76f21af9dfe949b99b2eb9"}, + {file = "uvloop-0.17.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:30babd84706115626ea78ea5dbc7dd8d0d01a2e9f9b306d24ca4ed5796c66ded"}, + {file = "uvloop-0.17.0.tar.gz", hash = "sha256:0ddf6baf9cf11a1a22c71487f39f15b2cf78eb5bde7e5b45fbb99e8a9d91b9e1"}, ] virtualenv = [ {file = "virtualenv-20.16.2-py2.py3-none-any.whl", hash = "sha256:635b272a8e2f77cb051946f46c60a54ace3cb5e25568228bd6b57fc70eca9ff3"}, diff --git a/pyproject.toml b/pyproject.toml index f82597f..6fa1a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,11 +1,13 @@ [tool.poetry] name = "taskiq" -version = "0.1.1" +version = "0.1.2" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "] readme = "README.md" repository = "https://github.com/taskiq-python/taskiq" +homepage = "https://taskiq-python.github.io/" +documentation = "https://taskiq-python.github.io/" license = "LICENSE" classifiers = [ "Typing :: Typed", @@ -21,7 +23,6 @@ classifiers = [ "Topic :: System :: Networking", "Development Status :: 3 - Alpha", ] -homepage = "https://github.com/taskiq-python/taskiq" keywords = ["taskiq", "tasks", "distributed", "async"] [tool.poetry.dependencies] @@ -34,6 +35,7 @@ watchdog = "^2.1.9" gitignore-parser = "^0.1.0" importlib-metadata = "<4.3" pycron = "^3.0.0" +graphlib-backport = { version = "^1.0.3", python="<3.9" } [tool.poetry.dev-dependencies] diff --git a/taskiq/__init__.py b/taskiq/__init__.py index 4a12354..5713c9a 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -8,11 +8,14 @@ from taskiq.brokers.shared_broker import async_shared_broker from taskiq.brokers.zmq_broker import ZeroMQBroker from taskiq.context import Context +from taskiq.dependencies import TaskiqDepends +from taskiq.events import TaskiqEvents from taskiq.exceptions import TaskiqError from taskiq.funcs import gather from taskiq.message import BrokerMessage, TaskiqMessage from taskiq.result import TaskiqResult from taskiq.scheduler import ScheduledTask, TaskiqScheduler +from taskiq.state import TaskiqState from taskiq.task import AsyncTaskiqTask __all__ = [ @@ -20,13 +23,16 @@ "Context", "AsyncBroker", "TaskiqError", + "TaskiqState", "TaskiqResult", "ZeroMQBroker", + "TaskiqEvents", "TaskiqMessage", "BrokerMessage", + "ScheduledTask", + "TaskiqDepends", "InMemoryBroker", "ScheduleSource", - "ScheduledTask", "TaskiqScheduler", "TaskiqFormatter", "AsyncTaskiqTask", diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index 431013b..8e47291 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -1,14 +1,16 @@ -import inspect import os import sys from abc import ABC, abstractmethod +from collections import defaultdict from functools import wraps from logging import getLogger from typing import ( # noqa: WPS235 TYPE_CHECKING, Any, + Awaitable, Callable, Coroutine, + DefaultDict, Dict, List, Optional, @@ -18,22 +20,27 @@ ) from uuid import uuid4 -from typing_extensions import ParamSpec +from typing_extensions import ParamSpec, TypeAlias +from taskiq.abc.middleware import TaskiqMiddleware from taskiq.decor import AsyncTaskiqDecoratedTask +from taskiq.events import TaskiqEvents from taskiq.formatters.json_formatter import JSONFormatter from taskiq.message import BrokerMessage from taskiq.result_backends.dummy import DummyResultBackend +from taskiq.state import TaskiqState +from taskiq.utils import maybe_awaitable -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.abc.formatter import TaskiqFormatter - from taskiq.abc.middleware import TaskiqMiddleware from taskiq.abc.result_backend import AsyncResultBackend _T = TypeVar("_T") # noqa: WPS111 _FuncParams = ParamSpec("_FuncParams") _ReturnType = TypeVar("_ReturnType") +EventHandler: TypeAlias = Callable[[TaskiqState], Optional[Awaitable[None]]] + logger = getLogger("taskiq") @@ -49,7 +56,7 @@ def default_id_generator() -> str: return uuid4().hex -class AsyncBroker(ABC): +class AsyncBroker(ABC): # noqa: WPS230 """ Async broker. @@ -75,8 +82,16 @@ def __init__( self.decorator_class = AsyncTaskiqDecoratedTask self.formatter: "TaskiqFormatter" = JSONFormatter() self.id_generator = task_id_generator - - def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None: + # Every event has a list of handlers. + # Every handler is a function which takes state as a first argument. + # And handler can be either sync or async. + self.event_handlers: DefaultDict[ # noqa: WPS234 + TaskiqEvents, + List[Callable[[TaskiqState], Optional[Awaitable[None]]]], + ] = defaultdict(list) + self.state = TaskiqState() + + def add_middlewares(self, *middlewares: "TaskiqMiddleware") -> None: """ Add a list of middlewares. @@ -86,11 +101,23 @@ def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None: :param middlewares: list of middlewares. """ for middleware in middlewares: + if not isinstance(middleware, TaskiqMiddleware): + logger.warning( + f"Middleware {middleware} is not an instance of TaskiqMiddleware. " + "Skipping...", + ) + continue middleware.set_broker(self) self.middlewares.append(middleware) async def startup(self) -> None: """Do something when starting broker.""" + event = TaskiqEvents.CLIENT_STARTUP + if self.is_worker_process: + event = TaskiqEvents.WORKER_STARTUP + + for handler in self.event_handlers[event]: + await maybe_awaitable(handler(self.state)) async def shutdown(self) -> None: """ @@ -99,11 +126,13 @@ async def shutdown(self) -> None: This method is called, when broker is closig. """ - for middleware in self.middlewares: - middleware_shutdown = middleware.shutdown() - if inspect.isawaitable(middleware_shutdown): - await middleware_shutdown - await self.result_backend.shutdown() + event = TaskiqEvents.CLIENT_SHUTDOWN + if self.is_worker_process: + event = TaskiqEvents.WORKER_SHUTDOWN + + # Call all shutdown events. + for handler in self.event_handlers[event]: + await maybe_awaitable(handler(self.state)) @abstractmethod async def kick( @@ -138,7 +167,7 @@ async def listen( def task( self, task_name: Callable[_FuncParams, _ReturnType], - ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: + ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: # pragma: no cover ... @overload @@ -149,7 +178,7 @@ def task( ) -> Callable[ [Callable[_FuncParams, _ReturnType]], AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType], - ]: + ]: # pragma: no cover ... def task( # type: ignore[misc] @@ -193,7 +222,7 @@ def inner( nonlocal inner_task_name # noqa: WPS420 if inner_task_name is None: fmodule = func.__module__ - if fmodule == "__main__": + if fmodule == "__main__": # pragma: no cover fmodule = ".".join( # noqa: WPS220 sys.argv[0] .removesuffix( @@ -232,3 +261,43 @@ def inner( inner_task_name=task_name, inner_labels=labels or {}, ) + + def on_event(self, *events: TaskiqEvents) -> Callable[[EventHandler], EventHandler]: + """ + Adds event handler. + + This function adds function to call when event occurs. + + :param events: events to react to. + :return: a decorator function. + """ + + def handler(function: EventHandler) -> EventHandler: + for event in events: + self.event_handlers[event].append(function) + return function + + return handler + + def add_event_handler( + self, + event: TaskiqEvents, + handler: EventHandler, + ) -> None: + """ + Adds event handler. + + this function is the same as on_event. + + >>> broker.add_event_handler(TaskiqEvents.WORKER_STARTUP, my_startup) + + if similar to: + + >>> @broker.on_event(TaskiqEvents.WORKER_STARTUP) + >>> async def my_startup(context: Context) -> None: + >>> ... + + :param event: Event to react to. + :param handler: handler to call when event is started. + """ + self.event_handlers[event].append(handler) diff --git a/taskiq/abc/middleware.py b/taskiq/abc/middleware.py index 5e8df6a..ffff7d8 100644 --- a/taskiq/abc/middleware.py +++ b/taskiq/abc/middleware.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING, Any, Coroutine, Union -if TYPE_CHECKING: # pragma: no cover +if TYPE_CHECKING: # pragma: no cover # pragma: no cover from taskiq.abc.broker import AsyncBroker from taskiq.message import TaskiqMessage from taskiq.result import TaskiqResult @@ -20,9 +20,6 @@ def set_broker(self, broker: "AsyncBroker") -> None: """ self.broker = broker - def shutdown(self) -> Union[None, Coroutine[Any, Any, None]]: - """This function is used to do some work on shutdown.""" - def pre_send( self, message: "TaskiqMessage", diff --git a/taskiq/abc/schedule_source.py b/taskiq/abc/schedule_source.py index f4b3550..7b732e4 100644 --- a/taskiq/abc/schedule_source.py +++ b/taskiq/abc/schedule_source.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING, List -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.scheduler.scheduler import ScheduledTask diff --git a/taskiq/abc/tests/test_broker.py b/taskiq/abc/tests/test_broker.py new file mode 100644 index 0000000..631a08c --- /dev/null +++ b/taskiq/abc/tests/test_broker.py @@ -0,0 +1,66 @@ +from typing import Any, Callable, Coroutine + +from taskiq.abc.broker import AsyncBroker +from taskiq.decor import AsyncTaskiqDecoratedTask +from taskiq.message import BrokerMessage + + +class _TestBroker(AsyncBroker): + """Broker for testing purpose.""" + + async def kick(self, message: BrokerMessage) -> None: + """ + This method is used to send messages. + + But in this case it just throws messages away. + + :param message: message to lost. + """ + + async def listen( + self, + callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]], + ) -> None: + """ + This method is not implemented. + + :param callback: callback that is never called. + """ + + +def test_decorator_success() -> None: + """Test that decoration without parameters works.""" + tbrok = _TestBroker() + + @tbrok.task + async def test_func() -> None: + """Some test function.""" + + assert isinstance(test_func, AsyncTaskiqDecoratedTask) + + +def test_decorator_with_name_success() -> None: + """Test that task_name is successfully set.""" + tbrok = _TestBroker() + + @tbrok.task(task_name="my_task") + async def test_func() -> None: + """Some test function.""" + + assert isinstance(test_func, AsyncTaskiqDecoratedTask) + assert test_func.task_name == "my_task" + + +def test_decorator_with_labels_success() -> None: + """Tests that labels are assigned for task as is.""" + tbrok = _TestBroker() + + @tbrok.task(label1=1, label2=2) + async def test_func() -> None: + """Some test function.""" + + assert isinstance(test_func, AsyncTaskiqDecoratedTask) + assert test_func.labels == { + "label1": 1, + "label2": 2, + } diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index ae1a9fc..8cb108c 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -6,8 +6,11 @@ from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult from taskiq.cli.worker.args import WorkerArgs from taskiq.cli.worker.receiver import Receiver +from taskiq.dependencies import DependencyGraph +from taskiq.events import TaskiqEvents from taskiq.exceptions import TaskiqError from taskiq.message import BrokerMessage +from taskiq.utils import maybe_awaitable _ReturnType = TypeVar("_ReturnType") @@ -124,6 +127,10 @@ async def kick(self, message: BrokerMessage) -> None: target_task = self.available_tasks.get(message.task_name) if target_task is None: raise TaskiqError("Unknown task.") + if not self.receiver.dependency_graphs.get(target_task.task_name): + self.receiver.dependency_graphs[target_task.task_name] = DependencyGraph( + target_task.original_func, + ) if not self.receiver.task_signatures.get(target_task.task_name): self.receiver.task_signatures[target_task.task_name] = inspect.signature( target_task.original_func, @@ -149,3 +156,15 @@ async def listen( :raises RuntimeError: if this method is called. """ raise RuntimeError("Inmemory brokers cannot listen.") + + async def startup(self) -> None: + """Runs startup events for client and worker side.""" + for event in (TaskiqEvents.CLIENT_STARTUP, TaskiqEvents.WORKER_STARTUP): + for handler in self.event_handlers.get(event, []): + await maybe_awaitable(handler(self.state)) + + async def shutdown(self) -> None: + """Runs shutdown events for client and worker side.""" + for event in (TaskiqEvents.CLIENT_SHUTDOWN, TaskiqEvents.WORKER_SHUTDOWN): + for handler in self.event_handlers.get(event, []): + await maybe_awaitable(handler(self.state)) diff --git a/taskiq/cli/worker/log_collector.py b/taskiq/cli/worker/log_collector.py index 4d3327d..3c1790b 100644 --- a/taskiq/cli/worker/log_collector.py +++ b/taskiq/cli/worker/log_collector.py @@ -1,7 +1,23 @@ import logging import sys from contextlib import contextmanager -from typing import Generator, List, TextIO +from typing import IO, Any, Generator, List, TextIO + + +class Redirector: + """A class to write to multiple streams.""" + + def __init__(self, *streams: IO[Any]) -> None: + self.streams = streams + + def write(self, message: Any) -> None: + """ + This write request writes to all avaialble streams. + + :param message: message to write. + """ + for stream in self.streams: + stream.write(message) @contextmanager @@ -36,8 +52,8 @@ def log_collector( old_targets.extend([sys.stdout, sys.stderr]) logging.root.addHandler(log_handler) - sys.stdout = new_target - sys.stderr = new_target + sys.stdout = Redirector(new_target, sys.stdout) # type: ignore + sys.stderr = Redirector(new_target, sys.stderr) # type: ignore try: yield new_target diff --git a/taskiq/cli/worker/receiver.py b/taskiq/cli/worker/receiver.py index a58d8e7..cba6489 100644 --- a/taskiq/cli/worker/receiver.py +++ b/taskiq/cli/worker/receiver.py @@ -12,38 +12,15 @@ from taskiq.cli.worker.log_collector import log_collector from taskiq.cli.worker.params_parser import parse_params from taskiq.context import Context +from taskiq.dependencies import DependencyGraph from taskiq.message import BrokerMessage, TaskiqMessage from taskiq.result import TaskiqResult +from taskiq.state import TaskiqState from taskiq.utils import maybe_awaitable logger = getLogger(__name__) -def inject_context( - type_hints: Dict[str, Any], - message: TaskiqMessage, - broker: AsyncBroker, -) -> None: - """ - Inject context parameter in message's kwargs. - - This function parses signature to get - the context parameter definition. - - If at least one parameter has the Context - type, it will add current context as kwarg. - - :param type_hints: function's type hints. - :param message: current taskiq message. - :param broker: current broker. - """ - if not type_hints: - return - for param_name, param_type in type_hints.items(): - if param_type is Context: - message.kwargs[param_name] = Context(message.copy(), broker) - - def _run_sync(target: Callable[..., Any], message: TaskiqMessage) -> Any: """ Runs function synchronously. @@ -66,9 +43,11 @@ def __init__(self, broker: AsyncBroker, cli_args: WorkerArgs) -> None: self.cli_args = cli_args self.task_signatures: Dict[str, inspect.Signature] = {} self.task_hints: Dict[str, Dict[str, Any]] = {} + self.dependency_graphs: Dict[str, DependencyGraph] = {} for task in self.broker.available_tasks.values(): self.task_signatures[task.task_name] = inspect.signature(task.original_func) self.task_hints[task.task_name] = get_type_hints(task.original_func) + self.dependency_graphs[task.task_name] = DependencyGraph(task.original_func) self.executor = ThreadPoolExecutor( max_workers=cli_args.max_threadpool_threads, ) @@ -175,16 +154,27 @@ async def run_task( # noqa: C901, WPS210 returned = None found_exception = None signature = self.task_signatures.get(message.task_name) + dependency_graph = self.dependency_graphs.get(message.task_name) if self.cli_args.no_parse: signature = None parse_params(signature, self.task_hints.get(message.task_name) or {}, message) - inject_context( - self.task_hints.get(message.task_name) or {}, - message, - self.broker, - ) + # Captures function's logs. with log_collector(logs, self.cli_args.log_collector_format): + dep_ctx = None + if dependency_graph: + # Create a context for dependency resolving. + dep_ctx = dependency_graph.ctx( + { + Context: Context(message, self.broker), + TaskiqState: self.broker.state, + }, + ) + # Resolve all function's dependencies. + dep_kwargs = await dep_ctx.resolve_kwargs() + for key, val in dep_kwargs.items(): + if key not in message.kwargs: + message.kwargs[key] = val # Start a timer. start_time = time() try: @@ -209,6 +199,8 @@ async def run_task( # noqa: C901, WPS210 ) # Stop the timer. execution_time = time() - start_time + if dep_ctx: + await dep_ctx.close() raw_logs = logs.getvalue() logs.close() diff --git a/taskiq/cli/worker/tests/test_context.py b/taskiq/cli/worker/tests/test_context.py deleted file mode 100644 index 5927cdf..0000000 --- a/taskiq/cli/worker/tests/test_context.py +++ /dev/null @@ -1,108 +0,0 @@ -from typing import get_type_hints - -from taskiq.cli.worker.receiver import inject_context -from taskiq.context import Context -from taskiq.message import TaskiqMessage - - -def test_inject_context_success() -> None: - """Test that context variable is injected as expected.""" - - def func(param1: int, ctx: Context) -> int: - return param1 - - message = TaskiqMessage( - task_id="", - task_name="", - labels={}, - args=[1], - kwargs={}, - ) - - inject_context( - get_type_hints(func), - message=message, - broker=None, # type: ignore - ) - - assert message.kwargs.get("ctx") - assert isinstance(message.kwargs["ctx"], Context) - - -def test_inject_context_success_string_annotation() -> None: - """ - Test that context variable is injected as expected. - - This test checks that if Context was provided as - string, then everything is work as expected. - """ - - def func(param1: int, ctx: "Context") -> int: - return param1 - - message = TaskiqMessage( - task_id="", - task_name="", - labels={}, - args=[1], - kwargs={}, - ) - - inject_context( - get_type_hints(func), - message=message, - broker=None, # type: ignore - ) - - assert message.kwargs.get("ctx") - assert isinstance(message.kwargs["ctx"], Context) - - -def test_inject_context_no_typehint() -> None: - """Test that context won't be injected in untyped parameter.""" - - def func(param1: int, ctx) -> int: # type: ignore - return param1 - - message = TaskiqMessage( - task_id="", - task_name="", - labels={}, - args=[1], - kwargs={}, - ) - - inject_context( - get_type_hints(func), - message=message, - broker=None, # type: ignore - ) - - assert message.kwargs.get("ctx") is None - - -def test_inject_context_no_ctx_parameter() -> None: - """ - Tests that injector won't raise an error. - - If the Context-typed parameter doesn't exist. - """ - - def func(param1: int) -> int: - return param1 - - message = TaskiqMessage( - task_id="", - task_name="", - labels={}, - args=[1], - kwargs={}, - ) - - inject_context( - get_type_hints(func), - message=message, - broker=None, # type: ignore - ) - - assert not message.kwargs diff --git a/taskiq/cli/worker/tests/test_receiver.py b/taskiq/cli/worker/tests/test_receiver.py index 271d38e..f917c59 100644 --- a/taskiq/cli/worker/tests/test_receiver.py +++ b/taskiq/cli/worker/tests/test_receiver.py @@ -121,7 +121,7 @@ def test_func() -> None: raise ValueError() broker = InMemoryBroker() - broker.add_middlewares([_TestMiddleware()]) + broker.add_middlewares(_TestMiddleware()) receiver = get_receiver(broker) result = await receiver.run_task( diff --git a/taskiq/context.py b/taskiq/context.py index 05f189b..bfc59a0 100644 --- a/taskiq/context.py +++ b/taskiq/context.py @@ -1,6 +1,11 @@ +from typing import TYPE_CHECKING + from taskiq.abc.broker import AsyncBroker from taskiq.message import TaskiqMessage +if TYPE_CHECKING: # pragma: no cover + from taskiq.state import TaskiqState + class Context: """Context class.""" @@ -8,6 +13,5 @@ class Context: def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None: self.message = message self.broker = broker - - -default_context = Context(None, None) # type: ignore + self.state: "TaskiqState" = None # type: ignore + self.state = broker.state diff --git a/taskiq/decor.py b/taskiq/decor.py index a6f397b..cb42f6b 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -14,7 +14,7 @@ from taskiq.kicker import AsyncKicker from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker _T = TypeVar("_T") # noqa: WPS111 diff --git a/taskiq/dependencies.py b/taskiq/dependencies.py new file mode 100644 index 0000000..be91c16 --- /dev/null +++ b/taskiq/dependencies.py @@ -0,0 +1,426 @@ +import inspect +from asyncio import iscoroutine +from collections import defaultdict, deque +from copy import copy +from graphlib import TopologicalSorter +from typing import ( # noqa: WPS235 + Any, + AsyncGenerator, + Callable, + Coroutine, + Dict, + Generator, + List, + Optional, + Type, + TypeVar, + Union, + get_type_hints, + overload, +) + +_T = TypeVar("_T") # noqa: WPS111 + + +@overload +def TaskiqDepends( # noqa: WPS234 + dependency: Optional[Callable[..., AsyncGenerator[_T, None]]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> _T: + ... + + +@overload +def TaskiqDepends( # noqa: WPS234 + dependency: Optional[Callable[..., Generator[_T, None, None]]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> _T: + ... + + +@overload +def TaskiqDepends( + dependency: Optional[Type[_T]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> _T: + ... + + +@overload +def TaskiqDepends( # noqa: WPS234 + dependency: Optional[Callable[..., Coroutine[Any, Any, _T]]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> _T: + ... + + +@overload +def TaskiqDepends( + dependency: Optional[Callable[..., _T]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> _T: + ... + + +def TaskiqDepends( + dependency: Optional[Any] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, +) -> Any: + """ + Constructs a dependency. + + This function returns TaskiqDepends + and needed for typehinting. + + :param dependency: function to run as a dependency. + :param use_cache: whether the dependency + can use previously calculated dependencies. + :param kwargs: optional keyword arguments to the dependency. + May be used to parametrize dependencies. + :return: TaskiqDepends instance. + """ + return _TaskiqDepends( + dependency=dependency, + use_cache=use_cache, + kwargs=kwargs, + ) + + +class _TaskiqDepends: + """ + Class to mark parameter as a dependency. + + This class is used to mark parameters of a function, + or a class as injectables, so taskiq can resolve it + and calculate before execution. + """ + + def __init__( # noqa: WPS234 + self, + dependency: Optional[Union[Type[Any], Callable[..., Any]]] = None, + *, + use_cache: bool = True, + kwargs: Optional[Dict[str, Any]] = None, + ) -> None: + self.dependency = dependency + self.use_cache = use_cache + self.param_name = "" + self.kwargs = kwargs or {} + + def __hash__(self) -> int: + return hash((self.dependency, self.use_cache, tuple(self.kwargs.keys()))) + + def __eq__(self, rhs: object) -> bool: + """ + Overriden eq operation. + + This is required to perform correct topological + sort after building dependency graph. + + :param rhs: object to compare. + :return: True if objects are equal. + """ + if not isinstance(rhs, _TaskiqDepends): + return False + return (self.dependency, self.use_cache, self.kwargs) == ( + rhs.dependency, + rhs.use_cache, + rhs.kwargs, + ) + + def __str__(self) -> str: + if self.dependency is None: + dep_name = "" + else: + dep_name = ( + f"{self.dependency.__module__}:" # noqa: WPS237 + f"{self.dependency.__name__}" + ) + return ( + f"TaskiqDepends({dep_name}, " + f"use_cache={self.use_cache}, " + f"kwargs={self.kwargs})" + ) + + +class DependencyResolveContext: + """ + Resolver context. + + This class is used to resolve dependencies + with custom initial caches. + + The main idea is to separate resolving and graph building. + It uses graph, but it doesn't modify it. + """ + + def __init__( + self, + graph: "DependencyGraph", + initial_cache: Optional[Dict[Any, Any]] = None, + ) -> None: + self.graph = graph + self.opened_dependencies: List[Any] = [] + self.sub_contexts: List["DependencyResolveContext"] = [] + self.initial_cache = initial_cache or {} + + async def __aenter__(self) -> "DependencyResolveContext": + return self + + async def __aexit__(self, *args: Any) -> None: + await self.close() + + async def close(self) -> None: # noqa: C901 + """ + Close all opened dependencies. + + This function runs teardown of all dependencies. + """ + for ctx in self.sub_contexts: + await ctx.close() + for dep in reversed(self.opened_dependencies): + if inspect.isgenerator(dep): + for _ in dep: # noqa: WPS328 + pass # noqa: WPS420 + elif inspect.isasyncgen(dep): + async for _ in dep: # noqa: WPS328 + pass # noqa: WPS420 + + async def resolve_kwargs( # noqa: C901, WPS210 + self, + ) -> Dict[str, Any]: + """ + Resolve dependencies and return them as a dict. + + This function runs all dependencies + and calculates key word arguments required to run target function. + + :return: Dict with keyword arguments. + """ + # If we have nothing to calculate, we return + # an empty dict. + if self.graph.is_empty(): + return {} + kwargs: Dict[str, Any] = {} + # We need to copy cache, in order + # to separate dependencies that use cache, + # from dependencies that aren't. + cache = copy(self.initial_cache) + # We iterate over topologicaly sorted list of dependencies. + for index, dep in enumerate(self.graph.ordered_deps): + # If this dependency doesn't use cache, + # we don't need to calculate it, since it may be met + # later. + if not dep.use_cache: + continue + # If somehow we have dependency with unknwon function. + if dep.dependency is None: + continue + # If dependency is already calculated. + if dep.dependency in cache: + continue + kwargs = {} + # Now we get list of dependencies for current top-level dependency + # and iterate over it. + for subdep in self.graph.dependencies[dep]: + # If we don't have known dependency function, + # we skip it. + if subdep.dependency is None: + continue + if subdep.use_cache: + # If this dependency can be calculated, using cache, + # we try to get it from cache. + kwargs[subdep.param_name] = cache[subdep.dependency] + else: + # If this dependency doesn't use cache, + # we resolve it's dependencies and + # run it. + subctx = self.graph.subgraphs[subdep].ctx(self.initial_cache) + # Add this graph resolve context to the list of subcontexts. + # We'll close it later. + self.sub_contexts.append(subctx) + resolved_kwargs = await subctx.resolve_kwargs() + if subdep.kwargs: + resolved_kwargs.update(subdep.kwargs) + subdep_exec = subdep.dependency(**resolved_kwargs) + if inspect.isgenerator(subdep_exec): + sub_result = next(subdep_exec) + self.opened_dependencies.append(subdep_exec) + elif iscoroutine(subdep_exec): + sub_result = await subdep_exec + elif inspect.isasyncgen(subdep_exec): + sub_result = await subdep_exec.__anext__() # noqa: WPS609 + self.opened_dependencies.append(subdep_exec) + else: + sub_result = subdep_exec + + kwargs[subdep.param_name] = sub_result + # We don't want to calculate least function, + # Because it's a target function. + if index < len(self.graph.ordered_deps) - 1: + user_kwargs = dep.kwargs + user_kwargs.update(kwargs) + cache_param = dep.dependency(**user_kwargs) + if inspect.isgenerator(cache_param): + result = next(cache_param) + self.opened_dependencies.append(cache_param) + elif iscoroutine(cache_param): + result = await cache_param + elif inspect.isasyncgen(cache_param): + result = await cache_param.__anext__() # noqa: WPS609 + self.opened_dependencies.append(cache_param) + else: + result = cache_param + cache[dep.dependency] = result + return kwargs + + +class DependencyGraph: + """Class to build dependency graph from a function.""" + + def __init__( + self, + target: Callable[..., Any], + ) -> None: + self.target = target + # Ordinary dependencies with cache. + self.dependencies: Dict[Any, List[_TaskiqDepends]] = defaultdict(list) + # Dependencies without cache. + # Can be considered as sub graphs. + self.subgraphs: Dict[Any, DependencyGraph] = {} + self.ordered_deps: List[_TaskiqDepends] = [] + self._build_graph() + + def is_empty(self) -> bool: + """ + Checks that target function depends on at least something. + + :return: True if depends. + """ + return len(self.ordered_deps) <= 1 + + def ctx( + self, + initial_cache: Optional[Dict[Any, Any]] = None, + ) -> DependencyResolveContext: + """ + Create dependency resolver context. + + This context is used to actually resolve dependencies. + + :param initial_cache: initial cache dict. + :return: new resolver context. + """ + return DependencyResolveContext( + self, + initial_cache, + ) + + def _build_graph(self) -> None: # noqa: C901, WPS210 + """ + Builds actual graph. + + This function collects all dependencies + and adds it the the _deps variable. + + After all dependencies are found, + it runs topological sort, to get the + dependency resolving order. + + :raises ValueError: if something happened. + """ + dep_deque = deque([_TaskiqDepends(self.target, use_cache=True)]) + + while dep_deque: + dep = dep_deque.popleft() + # Skip adding dependency if it's already present. + if dep in self.dependencies: + continue + if dep.dependency is None: + continue + # Get signature and type hints. + sign = inspect.signature(dep.dependency) + if inspect.isclass(dep.dependency): + # If this is a class, we need to get signature of + # an __init__ method. + hints = get_type_hints(dep.dependency.__init__) # noqa: WPS609 + else: + # If this is function, we get it's type hints. + hints = get_type_hints(dep.dependency) + + # Now we need to iterate over parameters, to + # find all parameters, that have TaskiqDepends as it's + # default vaule. + for param_name, param in sign.parameters.items(): + # We check, that default value is an instance of + # TaskiqDepends. + if not isinstance(param.default, _TaskiqDepends): + continue + + # If user haven't set the dependency, + # using TaskiqDepends constructor, + # we need to find variable's type hint. + if param.default.dependency is None: + if hints.get(param_name) is None: + # In this case, we don't know anything + # about this dependency. And it cannot be resolved. + dep_mod = "unknown" + dep_name = "unknown" + if dep.dependency is not None: + dep_mod = dep.dependency.__module__ + if inspect.isclass(dep.dependency): + dep_name = dep.dependency.__class__.__name__ + else: + dep_name = dep.dependency.__name__ + raise ValueError( + f"The dependency {param_name} of " + f"{dep_mod}:{dep_name} cannot be resolved.", + ) + # We get dependency class from typehint. + dependency_func = hints[param_name] + else: + # We can get dependency by simply using + # user supplied function. + dependency_func = param.default.dependency + + # Now we construct new TaskiqDepends instance + # with correct dependency function and cache. + dep_obj = _TaskiqDepends( + dependency_func, + use_cache=param.default.use_cache, + kwargs=param.default.kwargs, + ) + # Also we set the parameter name, + # it will help us in future when + # we're going to resolve all dependencies. + dep_obj.param_name = param_name + + # We append current dependency + # to the list of dependencies of + # the current function. + self.dependencies[dep].append(dep_obj) + if dep_obj.use_cache: + # If this dependency uses cache, we need to resolve + # it's dependencies further. + dep_deque.append(dep_obj) + else: + # If this dependency doesn't use caches, + # we build a subgraph for this dependency. + self.subgraphs[dep_obj] = DependencyGraph( + dependency_func, + ) + # Now we perform topological sort of all dependencies. + # Now we know the order we'll be using to resolve dependencies. + self.ordered_deps = list(TopologicalSorter(self.dependencies).static_order()) diff --git a/taskiq/events.py b/taskiq/events.py new file mode 100644 index 0000000..854cc99 --- /dev/null +++ b/taskiq/events.py @@ -0,0 +1,20 @@ +import enum + + +@enum.unique +class TaskiqEvents(enum.Enum): + """List of taskiq broker lifetime events.""" + + # Worker events. + + # Called on woker startup. + WORKER_STARTUP = "WORKER_STARTUP" + # Called o worker shutdown. + WORKER_SHUTDOWN = "WORKER_SHUTDOWN" + + # Client events. + + # Called when startup function is called from the client's code. + CLIENT_STARTUP = "CLIENT_STARTUP" + # Called if shutdown function was called from the client's code. + CLIENT_SHUTDOWN = "CLIENT_SHUTDOWN" diff --git a/taskiq/kicker.py b/taskiq/kicker.py index dd422c9..f6676e7 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -21,7 +21,7 @@ from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask from taskiq.utils import maybe_awaitable, run_sync -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.abc.broker import AsyncBroker _T = TypeVar("_T") # noqa: WPS111 diff --git a/taskiq/scheduler/merge_functions.py b/taskiq/scheduler/merge_functions.py index ac3b7a1..331857c 100644 --- a/taskiq/scheduler/merge_functions.py +++ b/taskiq/scheduler/merge_functions.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING, List -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.scheduler.scheduler import ScheduledTask diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index d94b1d8..095f08e 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -4,7 +4,7 @@ from taskiq.abc.broker import AsyncBroker from taskiq.scheduler.merge_functions import preserve_all -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.abc.schedule_source import ScheduleSource diff --git a/taskiq/state.py b/taskiq/state.py new file mode 100644 index 0000000..2947b23 --- /dev/null +++ b/taskiq/state.py @@ -0,0 +1,39 @@ +from collections import UserDict +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: # pragma: no cover + _Base = UserDict[str, Any] +else: + _Base = UserDict + + +class TaskiqState(_Base): + """ + State class. + + This class is used to store useful variables + for later use. + """ + + def __init__(self) -> None: + self.__dict__["data"] = {} + + def __getattr__(self, name: str) -> Any: + try: + return self.__dict__["data"][name] + except KeyError: + cls_name = self.__class__.__name__ + raise AttributeError(f"'{cls_name}' object has no attribute '{name}'") + + def __setattr__(self, name: str, value: Any) -> None: + self[name] = value + + def __delattr__(self, name: str) -> None: # noqa: WPS603 + try: + del self[name] # noqa: WPS420 + except KeyError: + cls_name = self.__class__.__name__ + raise AttributeError(f"'{cls_name}' object has no attribute '{name}'") + + def __str__(self) -> str: + return "TaskiqState(%s)" % super().__str__() diff --git a/taskiq/task.py b/taskiq/task.py index 54439b5..29d01de 100644 --- a/taskiq/task.py +++ b/taskiq/task.py @@ -10,7 +10,7 @@ ) from taskiq.utils import run_sync -if TYPE_CHECKING: +if TYPE_CHECKING: # pragma: no cover from taskiq.abc.result_backend import AsyncResultBackend from taskiq.result import TaskiqResult diff --git a/taskiq/tests/test_dependencies.py b/taskiq/tests/test_dependencies.py new file mode 100644 index 0000000..9097019 --- /dev/null +++ b/taskiq/tests/test_dependencies.py @@ -0,0 +1,175 @@ +import asyncio +from typing import AsyncGenerator, Generator + +import pytest + +from taskiq.dependencies import DependencyGraph, TaskiqDepends + + +@pytest.mark.anyio +async def test_dependency_successful() -> None: + """Test that a simlpe dependencies work.""" + + def dep1() -> int: + return 1 + + def testfunc(a: int = TaskiqDepends(dep1)) -> int: + return a + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 1} + + +@pytest.mark.anyio +async def test_dependency_async_successful() -> None: + """Test that async dependencies work fine.""" + + async def dep1() -> int: + await asyncio.sleep(0.001) + return 1 + + def testfunc(a: int = TaskiqDepends(dep1)) -> int: + return a + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 1} + + +@pytest.mark.anyio +async def test_dependency_gen_successful() -> None: + """Tests that generators work as expected.""" + starts = 0 + closes = 0 + + def dep1() -> Generator[int, None, None]: + nonlocal starts # noqa: WPS420 + nonlocal closes # noqa: WPS420 + + starts += 1 + + yield 1 + + closes += 1 + + def testfunc(a: int = TaskiqDepends(dep1)) -> int: + return a + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 1} + assert starts == 1 + assert closes == 0 + assert closes == 1 + + +@pytest.mark.anyio +async def test_dependency_async_gen_successful() -> None: + """This test checks that async generators work.""" + starts = 0 + closes = 0 + + async def dep1() -> AsyncGenerator[int, None]: + nonlocal starts # noqa: WPS420 + nonlocal closes # noqa: WPS420 + + await asyncio.sleep(0.001) + starts += 1 + + yield 1 + + await asyncio.sleep(0.001) + closes += 1 + + def testfunc(a: int = TaskiqDepends(dep1)) -> int: + return a + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 1} + assert starts == 1 + assert closes == 0 + assert closes == 1 + + +@pytest.mark.anyio +async def test_dependency_subdeps() -> None: + """Tests how subdependencies work.""" + + def dep1() -> int: + return 1 + + def dep2(a: int = TaskiqDepends(dep1)) -> int: + return a + 1 + + def testfunc(a: int = TaskiqDepends(dep2)) -> int: + return a + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 2} + + +@pytest.mark.anyio +async def test_dependency_caches() -> None: + """ + Tests how caches work. + + This test checks that + if multiple functions depend on one function, + This function must be calculated only once. + """ + dep_exec = 0 + + def dep1() -> int: + nonlocal dep_exec # noqa: WPS420 + dep_exec += 1 + + return 1 + + def dep2(a: int = TaskiqDepends(dep1)) -> int: + return a + 1 + + def dep3(a: int = TaskiqDepends(dep1)) -> int: + return a + 1 + + def testfunc( + a: int = TaskiqDepends(dep2), + b: int = TaskiqDepends(dep3), + ) -> int: + return a + b + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 2, "b": 2} + + assert dep_exec == 1 + + +@pytest.mark.anyio +async def test_dependency_subgraph() -> None: + """ + Tests how subgraphs work. + + If use_cache is False it must force + dependency graph to reevaluate it's subdependencies. + """ + dep_exec = 0 + + def dep1() -> int: + nonlocal dep_exec # noqa: WPS420 + dep_exec += 1 + + return 1 + + def dep2(a: int = TaskiqDepends(dep1)) -> int: + return a + 1 + + def dep3(a: int = TaskiqDepends(dep1, use_cache=False)) -> int: + return a + 1 + + def testfunc( + a: int = TaskiqDepends(dep2), + b: int = TaskiqDepends(dep3), + ) -> int: + return a + b + + async with DependencyGraph(testfunc).ctx({}) as ctx: + assert await ctx.resolve_kwargs() == {"a": 2, "b": 2} + + assert dep_exec == 2 diff --git a/taskiq/tests/test_state.py b/taskiq/tests/test_state.py new file mode 100644 index 0000000..07c64e9 --- /dev/null +++ b/taskiq/tests/test_state.py @@ -0,0 +1,58 @@ +from taskiq.state import TaskiqState + + +def test_state_set() -> None: + """Tests that you can sel values as dict items.""" + state = TaskiqState() + state["a"] = 1 + + assert state["a"] == 1 + + +def test_state_get() -> None: + """Tests that you can get values as dict items.""" + state = TaskiqState() + + state["a"] = 1 + + assert state["a"] == 1 + + +def test_state_del() -> None: + """Tests that you can del values as dict items.""" + state = TaskiqState() + + state["a"] = 1 + + del state["a"] # noqa: WPS420 + + assert state.get("a") is None + + +def test_state_set_attr() -> None: + """Tests that you can set values by attribute.""" + state = TaskiqState() + + state.a = 1 + + assert state["a"] == 1 + + +def test_state_get_attr() -> None: + """Tests that you can get values by attribute.""" + state = TaskiqState() + + state["a"] = 1 + + assert state.a == 1 + + +def test_state_del_attr() -> None: + """Tests that you can delete values by attribute.""" + state = TaskiqState() + + state["a"] = 1 + + del state.a # noqa: WPS420 + + assert state.get("a") is None