-
-
Notifications
You must be signed in to change notification settings - Fork 52
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
36 changed files
with
1,435 additions
and
236 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import asyncio | ||
from typing import AsyncGenerator | ||
|
||
from taskiq import TaskiqDepends | ||
|
||
|
||
async def dependency() -> AsyncGenerator[str, None]: | ||
print("Startup") | ||
await asyncio.sleep(0.1) | ||
|
||
yield "value" | ||
|
||
await asyncio.sleep(0.1) | ||
print("Shutdown") | ||
|
||
|
||
async def my_task(dep: str = TaskiqDepends(dependency)) -> None: | ||
print(dep.upper()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from taskiq import TaskiqDepends | ||
|
||
|
||
async def db_connection() -> str: | ||
return "let's pretend as this is a connection" | ||
|
||
|
||
class MyDAO: | ||
def __init__(self, db_conn: str = TaskiqDepends(db_connection)) -> None: | ||
self.db_conn = db_conn | ||
|
||
def get_users(self) -> str: | ||
return self.db_conn.upper() | ||
|
||
|
||
def my_task(dao: MyDAO = TaskiqDepends()) -> None: | ||
print(dao.get_users()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import random | ||
|
||
from taskiq import TaskiqDepends | ||
|
||
|
||
def common_dep() -> int: | ||
# For example it returns 8 | ||
return random.randint(1, 10) | ||
|
||
|
||
def dep1(cd: int = TaskiqDepends(common_dep)) -> int: | ||
# This function will return 9 | ||
return cd + 1 | ||
|
||
|
||
def dep2(cd: int = TaskiqDepends(common_dep)) -> int: | ||
# This function will return 10 | ||
return cd + 2 | ||
|
||
|
||
def my_task( | ||
d1: int = TaskiqDepends(dep1), | ||
d2: int = TaskiqDepends(dep2), | ||
) -> int: | ||
# This function will return 19 | ||
return d1 + d2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import asyncio | ||
from typing import Optional | ||
|
||
from redis.asyncio import ConnectionPool, Redis # type: ignore | ||
from taskiq_aio_pika import AioPikaBroker | ||
from taskiq_redis import RedisAsyncResultBackend | ||
|
||
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState | ||
|
||
# To run this example, please install: | ||
# * taskiq | ||
# * taskiq-redis | ||
# * taskiq-aio-pika | ||
|
||
broker = AioPikaBroker( | ||
"amqp://localhost", | ||
result_backend=RedisAsyncResultBackend( | ||
"redis://localhost/0", | ||
), | ||
) | ||
|
||
|
||
@broker.on_event(TaskiqEvents.WORKER_STARTUP) | ||
async def startup(state: TaskiqState) -> None: | ||
# Here we store connection pool on startup for later use. | ||
state.redis = ConnectionPool.from_url("redis://localhost/1") | ||
|
||
|
||
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) | ||
async def shutdown(state: TaskiqState) -> None: | ||
# Here we close our pool on shutdown event. | ||
await state.redis.disconnect() | ||
|
||
|
||
@broker.task | ||
async def get_val(key: str, context: Context = TaskiqDepends()) -> Optional[str]: | ||
# Now we can use our pool. | ||
redis = Redis(connection_pool=context.state.redis, decode_responses=True) | ||
return await redis.get(key) | ||
|
||
|
||
@broker.task | ||
async def set_val(key: str, value: str, context: Context = TaskiqDepends()) -> None: | ||
# Now we can use our pool to set value. | ||
await Redis(connection_pool=context.state.redis).set(key, value) | ||
|
||
|
||
async def main() -> None: | ||
await broker.startup() | ||
|
||
set_task = await set_val.kiq("key", "value") | ||
set_result = await set_task.wait_result(with_logs=True) | ||
if set_result.is_err: | ||
print(set_result.log) | ||
raise ValueError("Cannot set value in redis. See logs.") | ||
|
||
get_task = await get_val.kiq("key") | ||
get_res = await get_task.wait_result() | ||
print(f"Got redis value: {get_res.return_value}") | ||
|
||
await broker.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from typing import Generator | ||
|
||
from taskiq import TaskiqDepends | ||
|
||
|
||
def dependency() -> Generator[str, None, None]: | ||
print("Startup") | ||
|
||
yield "value" | ||
|
||
print("Shutdown") | ||
|
||
|
||
async def my_task(dep: str = TaskiqDepends(dependency)) -> None: | ||
print(dep.upper()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import random | ||
|
||
from taskiq import TaskiqDepends | ||
|
||
|
||
def common_dep() -> int: | ||
return random.randint(1, 10) | ||
|
||
|
||
def dep1(cd: int = TaskiqDepends(common_dep)) -> int: | ||
return cd + 1 | ||
|
||
|
||
def dep2(cd: int = TaskiqDepends(common_dep, use_cache=False)) -> int: | ||
return cd + 2 | ||
|
||
|
||
def my_task( | ||
d1: int = TaskiqDepends(dep1), | ||
d2: int = TaskiqDepends(dep2), | ||
) -> int: | ||
return d1 + d2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.