Skip to content

Commit

Permalink
Merge branch 'release/0.1.8'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Jan 26, 2023
2 parents 89c534b + 08f62c4 commit 820287a
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq"
version = "0.1.7"
version = "0.1.8"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
Expand Down
16 changes: 15 additions & 1 deletion taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ async def schedules_updater(
await asyncio.sleep(scheduler.refresh_delay)


def should_run(task: ScheduledTask) -> bool:
"""
Checks if it's time to run a task.
:param task: task to check.
:return: True if task must be sent.
"""
if task.cron is not None:
return is_now(task.cron)
if task.time is not None:
return task.time <= datetime.utcnow()
return False


async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213
"""
Runs scheduler loop.
Expand Down Expand Up @@ -83,7 +97,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
not_fired_tasks = []
for task in tasks:
try:
ready = is_now(task.cron)
ready = should_run(task)
except ValueError:
logger.warning(
"Cannot parse cron: %s for task: %s",
Expand Down
6 changes: 3 additions & 3 deletions taskiq/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from importlib import import_module
from logging import getLogger
from pathlib import Path
from typing import Any, Generator
from typing import Any, Generator, List

logger = getLogger("taskiq.worker")

Expand Down Expand Up @@ -51,7 +51,7 @@ def import_object(object_spec: str) -> Any:
return getattr(module, import_spec[1])


def import_from_modules(modules: list[str]) -> None:
def import_from_modules(modules: List[str]) -> None:
"""
Import all modules from modules variable.
Expand All @@ -66,7 +66,7 @@ def import_from_modules(modules: list[str]) -> None:
logger.warning(f"Cannot import {module}")


def import_tasks(modules: list[str], pattern: str, fs_discover: bool) -> None:
def import_tasks(modules: List[str], pattern: str, fs_discover: bool) -> None:
"""
Import tasks modules.
Expand Down
5 changes: 3 additions & 2 deletions taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
if task.broker != self.broker:
continue
for schedule in task.labels.get("schedule", []):
if "cron" not in schedule:
if "cron" not in schedule and "time" not in schedule:
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
Expand All @@ -38,7 +38,8 @@ async def get_schedules(self) -> List["ScheduledTask"]:
labels=labels,
args=schedule.get("args", []),
kwargs=schedule.get("kwargs", {}),
cron=schedule["cron"],
cron=schedule.get("cron"),
time=schedule.get("time"),
),
)
return schedules
17 changes: 14 additions & 3 deletions taskiq/scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Dict, List
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional

from taskiq.abc.broker import AsyncBroker
from taskiq.scheduler.merge_functions import preserve_all
Expand All @@ -16,7 +17,17 @@ class ScheduledTask:
labels: Dict[str, Any]
args: List[Any]
kwargs: Dict[str, Any]
cron: str
cron: Optional[str] = field(default=None)
time: Optional[datetime] = field(default=None)

def __post_init__(self) -> None:
"""
This method validates, that either `cron` or `time` field is present.
:raises ValueError: if cron and time are none.
"""
if self.cron is None and self.time is None:
raise ValueError("Either cron or datetime must be present.")


class TaskiqScheduler:
Expand Down

0 comments on commit 820287a

Please sign in to comment.