From e66d3e282a1c4e177ad327d92160a8eb24f82a7c Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 27 Jan 2023 01:50:06 +0400 Subject: [PATCH 1/4] Added datetime schedule. (#60) Signed-off-by: Pavel Kirilin --- taskiq/cli/scheduler/run.py | 16 +++++++++++++++- taskiq/cli/utils.py | 6 +++--- taskiq/scheduler/scheduler.py | 18 ++++++++++++++---- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 50ea441..cece822 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -47,6 +47,20 @@ async def schedules_updater( await asyncio.sleep(scheduler.refresh_delay) +def should_run(task: ScheduledTask) -> bool: + """ + Checks if it's time to run a task. + + :param task: task to check. + :return: True if task must be sent. + """ + if task.cron is not None: + return is_now(task.cron) + if task.time is not None: + return task.time <= datetime.utcnow() + return False + + async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213 """ Runs scheduler loop. @@ -83,7 +97,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS not_fired_tasks = [] for task in tasks: try: - ready = is_now(task.cron) + ready = should_run(task) except ValueError: logger.warning( "Cannot parse cron: %s for task: %s", diff --git a/taskiq/cli/utils.py b/taskiq/cli/utils.py index 0bc90c1..114c235 100644 --- a/taskiq/cli/utils.py +++ b/taskiq/cli/utils.py @@ -4,7 +4,7 @@ from importlib import import_module from logging import getLogger from pathlib import Path -from typing import Any, Generator +from typing import Any, Generator, List logger = getLogger("taskiq.worker") @@ -51,7 +51,7 @@ def import_object(object_spec: str) -> Any: return getattr(module, import_spec[1]) -def import_from_modules(modules: list[str]) -> None: +def import_from_modules(modules: List[str]) -> None: """ Import all modules from modules variable. @@ -66,7 +66,7 @@ def import_from_modules(modules: list[str]) -> None: logger.warning(f"Cannot import {module}") -def import_tasks(modules: list[str], pattern: str, fs_discover: bool) -> None: +def import_tasks(modules: List[str], pattern: str, fs_discover: bool) -> None: """ Import tasks modules. diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index e254e47..d909f7d 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -1,5 +1,6 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Callable, Dict, List +from dataclasses import dataclass, field +from datetime import datetime +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional from taskiq.abc.broker import AsyncBroker from taskiq.scheduler.merge_functions import preserve_all @@ -16,7 +17,17 @@ class ScheduledTask: labels: Dict[str, Any] args: List[Any] kwargs: Dict[str, Any] - cron: str + cron: Optional[str] = field(default=None) + time: Optional[datetime] = field(default=None) + + def __post_init__(self) -> None: + """ + This method validates, that either `cron` or `time` field is present. + + :raises ValueError: if cron and time are none. + """ + if self.cron is None and self.time is None: + raise ValueError("Either cron or datetime must be present.") class TaskiqScheduler: @@ -44,4 +55,3 @@ async def startup(self) -> None: # pragma: no cover Here you can do stuff, like creating connections or anything you'd like. """ - await self.broker.startup() From 6f5e8166b145cb74cd1430c28fe0abbf37813f42 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 27 Jan 2023 01:51:37 +0400 Subject: [PATCH 2/4] Returned broker startup. Signed-off-by: Pavel Kirilin --- taskiq/scheduler/scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index d909f7d..2a406dc 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -55,3 +55,4 @@ async def startup(self) -> None: # pragma: no cover Here you can do stuff, like creating connections or anything you'd like. """ + await self.broker.startup() From 897e451b246f894a30060ebfedd1f1af767de1d7 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 27 Jan 2023 01:54:14 +0400 Subject: [PATCH 3/4] Updated label based schedule source. Signed-off-by: Pavel Kirilin --- taskiq/schedule_sources/label_based.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index ff716b5..aac4f19 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -28,7 +28,7 @@ async def get_schedules(self) -> List["ScheduledTask"]: if task.broker != self.broker: continue for schedule in task.labels.get("schedule", []): - if "cron" not in schedule: + if "cron" not in schedule and "time" not in schedule: continue labels = schedule.get("labels", {}) labels.update(task.labels) @@ -38,7 +38,8 @@ async def get_schedules(self) -> List["ScheduledTask"]: labels=labels, args=schedule.get("args", []), kwargs=schedule.get("kwargs", {}), - cron=schedule["cron"], + cron=schedule.get("cron"), + time=schedule.get("time"), ), ) return schedules From 08f62c4c0aac2f15481b306eae581f481f852fd0 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 27 Jan 2023 01:57:31 +0400 Subject: [PATCH 4/4] Version bumped to 0.1.8. Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2be779d..abe25fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.1.7" +version = "0.1.8" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "]